You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2019/01/09 09:41:31 UTC

[GitHub] sk0x50 closed pull request #5776: IGNITE-5003 run all tests

sk0x50 closed pull request #5776: IGNITE-5003 run all tests
URL: https://github.com/apache/ignite/pull/5776
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
index fdf3649fbd8f..c64dbc81815a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
@@ -439,10 +439,24 @@ public void forceFlush() throws IgniteCheckedException {
                 val.readLock().lock();
 
                 try {
-                    if (val.operation() == StoreOperation.PUT)
-                        loaded.put(key, val.entry().getValue());
+                    StoreOperation op;
+
+                    V value;
+
+                    if (writeCoalescing && val.nextOperation() != null) {
+                        op = val.nextOperation();
+
+                        value = (op == StoreOperation.PUT) ? val.nextEntry().getValue() : null;
+                    } else {
+                        op = val.operation();
+
+                        value = (op == StoreOperation.PUT) ? val.entry().getValue() : null;
+                    }
+
+                    if (op == StoreOperation.PUT)
+                        loaded.put(key, value);
                     else
-                        assert val.operation() == StoreOperation.RMV : val.operation();
+                        assert op == StoreOperation.RMV : op;
                 }
                 finally {
                     val.readLock().unlock();
@@ -484,9 +498,23 @@ public void forceFlush() throws IgniteCheckedException {
             val.readLock().lock();
 
             try {
-                switch (val.operation()) {
+                StoreOperation op;
+
+                V value;
+
+                if (writeCoalescing && val.nextOperation() != null) {
+                    op = val.nextOperation();
+
+                    value = (op == StoreOperation.PUT) ? val.nextEntry().getValue() : null;
+                } else {
+                    op = val.operation();
+
+                    value = (op == StoreOperation.PUT) ? val.entry().getValue() : null;
+                }
+
+                switch (op) {
                     case PUT:
-                        return val.entry().getValue();
+                        return value;
 
                     case RMV:
                         return null;
@@ -593,11 +621,14 @@ private void putToWriteCache(
             prev.writeLock().lock();
 
             try {
-                if (prev.status() == ValueStatus.PENDING) {
-                    // Flush process in progress, try again.
-                    prev.waitForFlush();
+                if (prev.status() == ValueStatus.PENDING || prev.status() == ValueStatus.PENDING_AND_UPDATED) {
+                    // Flush process in progress, save next value and update the status.
 
-                    continue;
+                    prev.setNext(newVal.val, newVal.storeOperation);
+
+                    prev.status(ValueStatus.PENDING_AND_UPDATED);
+
+                    break;
                 }
                 else if (prev.status() == ValueStatus.FLUSHED)
                     // This entry was deleted from map before we acquired the lock.
@@ -712,14 +743,22 @@ private boolean applyBatch(Map<K, StatefulValue<K, V>> valMap, boolean initSes,
         Map<K, Entry<? extends K, ? extends V>> batch = U.newLinkedHashMap(valMap.size());
 
         for (Map.Entry<K, StatefulValue<K, V>> e : valMap.entrySet()) {
-            if (operation == null)
-                operation = e.getValue().operation();
+            StatefulValue<K, V> val = e.getValue();
 
-            assert operation == e.getValue().operation();
+            val.readLock().lock();
 
-            assert e.getValue().status() == ValueStatus.PENDING;
+            try {
+                if (operation == null)
+                    operation = val.operation();
+
+                assert operation == val.operation();
 
-            batch.put(e.getKey(), e.getValue().entry());
+                assert val.status() == ValueStatus.PENDING || val.status() == ValueStatus.PENDING_AND_UPDATED;
+
+                batch.put(e.getKey(), val.entry());
+            } finally {
+                val.readLock().unlock();
+            }
         }
 
         boolean result = updateStore(operation, batch, initSes, flusher);
@@ -731,17 +770,26 @@ private boolean applyBatch(Map<K, StatefulValue<K, V>> valMap, boolean initSes,
                 val.writeLock().lock();
 
                 try {
-                    val.status(ValueStatus.FLUSHED);
-
                     if (writeCoalescing) {
-                        StatefulValue<K, V> prev = writeCache.remove(e.getKey());
+                        if (val.status() == ValueStatus.PENDING_AND_UPDATED) {
+                            val.update(val.nextEntry(), val.nextOperation(), ValueStatus.NEW);
+
+                            val.setNext(null, null);
+                        }
+                        else {
+                            val.status(ValueStatus.FLUSHED);
 
-                        // Additional check to ensure consistency.
-                        assert prev == val : "Map value for key " + e.getKey() + " was updated during flush";
+                            StatefulValue<K, V> prev = writeCache.remove(e.getKey());
+
+                            // Additional check to ensure consistency.
+                            assert prev == val : "Map value for key " + e.getKey() + " was updated during flush";
+                        }
 
                         val.signalFlushed();
                     }
                     else {
+                        val.status(ValueStatus.FLUSHED);
+
                         Flusher f = flusher(e.getKey());
 
                         // Can remove using equal because if map contains another similar value it has different state.
@@ -761,9 +809,16 @@ private boolean applyBatch(Map<K, StatefulValue<K, V>> valMap, boolean initSes,
                 val.writeLock().lock();
 
                 try {
-                    val.status(ValueStatus.RETRY);
+                    if (val.status() == ValueStatus.PENDING_AND_UPDATED) {
+                        val.update(val.nextEntry(), val.nextOperation(), ValueStatus.NEW);
 
-                    retryEntriesCnt.incrementAndGet();
+                        val.setNext(null, null);
+                    }
+                    else {
+                        val.status(ValueStatus.RETRY);
+
+                        retryEntriesCnt.incrementAndGet();
+                    }
 
                     val.signalFlushed();
                 }
@@ -1116,13 +1171,17 @@ private void flushCacheCoalescing() {
 
                     switch (addRes) {
                         case NEW_BATCH:
+                            // No need to test first value in batch
+                            val.status(ValueStatus.PENDING);
+
+                            val.writeLock().unlock();
+
                             applyBatch(pending, true, null);
 
                             pending = U.newLinkedHashMap(batchSize);
 
-                            // No need to test first value in batch
-                            val.status(ValueStatus.PENDING);
                             pending.put(e.getKey(), val);
+
                             prevOperation = val.operation();
 
                             break;
@@ -1137,7 +1196,8 @@ private void flushCacheCoalescing() {
                     }
                 }
                 finally {
-                    val.writeLock().unlock();
+                    if (val.writeLock().isHeldByCurrentThread())
+                        val.writeLock().unlock();
                 }
             }
 
@@ -1307,6 +1367,12 @@ public BatchingResult tryAddStatefulValue(
         /** Value is captured by flusher and store operation is performed at the moment. */
         PENDING,
 
+        /**
+         * Value is captured by flusher and store operation is performed at the moment.
+         * New update for the key was stored and waiting for previous store operation.
+         */
+        PENDING_AND_UPDATED,
+
         /** Store operation has failed and it will be re-tried at the next flush. */
         RETRY,
 
@@ -1335,7 +1401,7 @@ public BatchingResult tryAddStatefulValue(
      * @return {@code true} if status indicates any pending or complete store update operation.
      */
     private boolean acquired(ValueStatus status) {
-        return status == ValueStatus.PENDING || status == ValueStatus.FLUSHED;
+        return status == ValueStatus.PENDING || status == ValueStatus.FLUSHED || status == ValueStatus.PENDING_AND_UPDATED;
     }
 
     /**
@@ -1352,9 +1418,16 @@ private boolean acquired(ValueStatus status) {
         @GridToStringInclude(sensitive = true)
         private Entry<? extends K, ? extends V> val;
 
+        /** Next value that waiting for a previous store operation. */
+        @GridToStringInclude(sensitive = true)
+        private Entry<? extends K, ? extends V> nextVal;
+
         /** Store operation. */
         private StoreOperation storeOperation;
 
+        /** Next store operation. */
+        private StoreOperation nextStoreOperation;
+
         /** Value status. */
         private ValueStatus valStatus;
 
@@ -1375,6 +1448,20 @@ private StatefulValue(Entry<? extends K, ? extends V> val, StoreOperation storeO
             valStatus = ValueStatus.NEW;
         }
 
+        /**
+         * @return Next stored value.
+         */
+        private Entry<? extends K, ? extends V> nextEntry() {
+            return nextVal;
+        }
+
+        /**
+         * @return Store operation.
+         */
+        private StoreOperation nextOperation() {
+            return nextStoreOperation;
+        }
+
         /**
          * @return Stored value.
          */
@@ -1420,6 +1507,18 @@ private void update(@Nullable Entry<? extends K, ? extends V> val,
             this.valStatus = valStatus;
         }
 
+        /**
+         * Added next value that waiting for a previous store operation.
+         *
+         * @param val Value.
+         * @param storeOperation Store operation.
+         */
+        private void setNext(@Nullable Entry<? extends K, ? extends V> val,
+            StoreOperation storeOperation) {
+            this.nextVal = val;
+            this.nextStoreOperation = storeOperation;
+        }
+
         /**
          * Awaits a signal on flush condition.
          *
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoWriteBehindStoreWithCoalescingTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoWriteBehindStoreWithCoalescingTest.java
new file mode 100644
index 000000000000..cbcc22f7e3cc
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoWriteBehindStoreWithCoalescingTest.java
@@ -0,0 +1,680 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.jdbc;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.CacheException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.store.jdbc.model.TestJdbcPojoDataSourceFactory;
+import org.apache.ignite.cache.store.jdbc.model.TestJdbcPojoStoreFactoryWithHangWriteAll;
+import org.apache.ignite.cache.store.jdbc.model.TestPojo;
+import org.apache.ignite.cache.store.jdbc.dialect.H2Dialect;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_SAFE;
+import static org.apache.ignite.configuration.DataPageEvictionMode.RANDOM_LRU;
+import static org.apache.ignite.configuration.WALMode.LOG_ONLY;
+
+/**
+ * Tests for {@link CacheJdbcPojoStore}.
+ */
+@RunWith(JUnit4.class)
+public class CacheJdbcPojoWriteBehindStoreWithCoalescingTest extends GridCommonAbstractTest {
+    /** */
+    private static final String DFLT_CONN_URL = "jdbc:h2:mem:TestDatabase;DB_CLOSE_DELAY=-1";
+
+    /** */
+    private boolean isHangOnWriteAll = false;
+
+    /** */
+    private boolean isSmallRegion = false;
+
+    /** */
+    private final AtomicBoolean testFailed = new AtomicBoolean(false);
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 20 * 60 * 1000; //20 min
+    }
+
+    /** */
+    public DataStorageConfiguration getDataStorageConfiguration() {
+        DataStorageConfiguration memCfg = new DataStorageConfiguration();
+
+        DataRegionConfiguration plc = new DataRegionConfiguration();
+
+        plc.setName("Default_Region");
+
+        plc.setPageEvictionMode(RANDOM_LRU);
+
+        if (isSmallRegion)
+            plc.setMaxSize(128L * 1024 * 1024); // 128 MB
+        else
+            plc.setMaxSize(1L * 1024 * 1024 * 1024); // 1GB
+
+        memCfg.setDefaultDataRegionConfiguration(plc);
+
+        memCfg.setWalMode(LOG_ONLY);
+
+        return memCfg;
+    }
+
+    /** */
+    public TestJdbcPojoDataSourceFactory getDataSourceFactory() {
+        TestJdbcPojoDataSourceFactory testJdbcPojoDataSourceFactory = new TestJdbcPojoDataSourceFactory();
+
+        testJdbcPojoDataSourceFactory.setURL("jdbc:h2:mem:TestDatabase;DB_CLOSE_DELAY=-1");
+
+        testJdbcPojoDataSourceFactory.setUserName("sa");
+
+        testJdbcPojoDataSourceFactory.setPassword("");
+
+        return testJdbcPojoDataSourceFactory;
+    }
+
+    /** */
+    public JdbcType getJdbcType() {
+        JdbcType type = new JdbcType();
+
+        type.setCacheName("TEST_CACHE");
+
+        type.setKeyType(Integer.class);
+
+        type.setValueType(TestPojo.class);
+
+        type.setDatabaseSchema("PUBLIC");
+
+        type.setDatabaseTable("TEST_CACHE");
+
+        type.setKeyFields(new JdbcTypeField(java.sql.Types.INTEGER, "VALUE2", Integer.class, "value2"));
+
+        type.setValueFields(
+            new JdbcTypeField(java.sql.Types.VARCHAR, "VALUE1", String.class, "value1"),
+            new JdbcTypeField(java.sql.Types.DATE, "VALUE3", java.sql.Date.class, "value3")
+        );
+
+        return type;
+    }
+
+    /** */
+    public CacheJdbcPojoStoreFactory getStoreFactory() {
+        CacheJdbcPojoStoreFactory storeFactory = new CacheJdbcPojoStoreFactory();
+
+        storeFactory.setParallelLoadCacheMinimumThreshold(100);
+
+        storeFactory.setBatchSize(100);
+
+        storeFactory.setMaximumPoolSize(4);
+
+        storeFactory.setDataSourceFactory(getDataSourceFactory());
+
+        storeFactory.setDialect(new H2Dialect());
+
+        storeFactory.setTypes(getJdbcType());
+
+        return storeFactory;
+    }
+
+    /** */
+    public CacheJdbcPojoStoreFactory getStoreFactoryWithHangWriteAll() {
+        TestJdbcPojoStoreFactoryWithHangWriteAll storeFactory = new TestJdbcPojoStoreFactoryWithHangWriteAll();
+
+        storeFactory.setParallelLoadCacheMinimumThreshold(100);
+
+        storeFactory.setBatchSize(100);
+
+        storeFactory.setMaximumPoolSize(4);
+
+        storeFactory.setDataSourceFactory(getDataSourceFactory());
+
+        storeFactory.setDialect(new H2Dialect());
+
+        storeFactory.setTypes(getJdbcType());
+
+        return storeFactory;
+    }
+
+    /** */
+    public CacheConfiguration getCacheConfiguration() {
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setName("TEST_CACHE");
+
+        ccfg.setCacheMode(REPLICATED);
+
+        ccfg.setAtomicityMode(ATOMIC);
+
+        ccfg.setPartitionLossPolicy(READ_WRITE_SAFE);
+
+        ccfg.setReadThrough(true);
+
+        ccfg.setWriteThrough(true);
+
+        ccfg.setWriteBehindEnabled(true);
+
+        ccfg.setWriteBehindBatchSize(1000);
+
+        QueryEntity queryEntity = new QueryEntity();
+
+        queryEntity.setKeyType("java.lang.Integer");
+
+        queryEntity.setValueType("org.apache.ignite.cache.store.jdbc.model.TestPojo");
+
+        queryEntity.setTableName("TEST_CACHE");
+
+        queryEntity.setKeyFieldName("value3");
+
+        Set<String> keyFiles = new HashSet<>();
+
+        keyFiles.add("value3");
+
+        queryEntity.setKeyFields(keyFiles);
+
+        LinkedHashMap<String, String> fields = new LinkedHashMap<>();
+
+        fields.put("value1", "java.lang.String");
+
+        fields.put("value2", "java.lang.Integer");
+
+        fields.put("value3", "java.sql.Date");
+
+        queryEntity.setFields(fields);
+
+        Map<String, String> aliases = new HashMap<>();
+
+        aliases.put("value1", "VALUE1");
+
+        aliases.put("value2", "VALUE2");
+
+        aliases.put("value3", "VALUE3");
+
+        queryEntity.setAliases(aliases);
+
+        ArrayList<QueryEntity> queryEntities = new ArrayList<>();
+
+        queryEntities.add(queryEntity);
+
+        ccfg.setQueryEntities(queryEntities);
+
+        return ccfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        //Data Storage
+
+        cfg.setDataStorageConfiguration(getDataStorageConfiguration());
+
+        //cache configuration
+
+        CacheConfiguration ccfg = getCacheConfiguration();
+
+        if (isHangOnWriteAll)
+            ccfg.setCacheStoreFactory(getStoreFactoryWithHangWriteAll());
+        else
+            ccfg.setCacheStoreFactory(getStoreFactory());
+
+        cfg.setCacheConfiguration(ccfg);
+
+        //discovery
+
+        TcpDiscoverySpi tcpDiscoverySpi = new TcpDiscoverySpi();
+
+        TcpDiscoveryVmIpFinder tcpDiscoveryVmIpFinder = new TcpDiscoveryVmIpFinder();
+
+        ArrayList<String> addrs = new ArrayList<>();
+
+        addrs.add("127.0.0.1:47500..47509");
+
+        tcpDiscoveryVmIpFinder.setAddresses(addrs);
+
+        tcpDiscoverySpi.setIpFinder(tcpDiscoveryVmIpFinder);
+
+        cfg.setDiscoverySpi(tcpDiscoverySpi);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        testFailed.set(false);
+
+        cleanPersistenceDir();
+
+        try {
+            Connection conn = DriverManager.getConnection(DFLT_CONN_URL, "sa", "");
+
+            Statement stmt = conn.createStatement();
+
+            stmt.executeUpdate("DROP TABLE IF EXISTS TEST_CACHE");
+
+            stmt.executeUpdate("CREATE TABLE TEST_CACHE (" +
+                " VALUE2 INTEGER PRIMARY KEY," +
+                " VALUE1 VARCHAR(50)," +
+                " VALUE3 DATE" +
+                ")"
+            );
+
+            conn.commit();
+
+            U.closeQuiet(stmt);
+
+            U.closeQuiet(conn);
+        } catch (SQLException ex) {
+            fail(ex.getMessage());
+        }
+
+        super.beforeTest();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /** */
+    public void checkCacheStore(IgniteCache<Integer, TestPojo> cache) {
+        try {
+            Connection conn = DriverManager.getConnection(DFLT_CONN_URL, "sa", "");
+
+            Statement stmt = conn.createStatement();
+
+            ResultSet rs = stmt.executeQuery(" SELECT * FROM TEST_CACHE");
+
+            int count = 0;
+
+            while (rs.next()) {
+                String value1 = rs.getString("VALUE1");
+
+                Integer value2 = rs.getInt("VALUE2");
+
+                java.sql.Date value3 = rs.getDate("VALUE3");
+
+                TestPojo pojo = cache.get(value2);
+
+                assertNotNull(pojo);
+
+                Calendar c1 = Calendar.getInstance();
+
+                c1.setTime(value3);
+
+                Calendar c2 = Calendar.getInstance();
+
+                c2.setTime(pojo.getValue3());
+
+                assertEquals(value1, pojo.getValue1());
+
+                assertEquals(value2, pojo.getValue2());
+
+                assertEquals(c1.get(Calendar.DAY_OF_YEAR), c2.get(Calendar.DAY_OF_YEAR));
+
+                assertEquals(c1.get(Calendar.YEAR), c2.get(Calendar.YEAR));
+
+                assertEquals(c1.get(Calendar.MONTH), c2.get(Calendar.MONTH));
+
+                count++;
+            }
+
+            assertEquals(count, cache.size());
+
+            U.closeQuiet(stmt);
+
+            U.closeQuiet(conn);
+        } catch (SQLException ex) {
+            fail();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testHangWriteAllWithCoalescing() throws Exception {
+        isHangOnWriteAll = true;
+
+        writeAllWithCoalescing();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testNormalWriteAllWithCoalescing() throws Exception {
+        isHangOnWriteAll = false;
+
+        writeAllWithCoalescing();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testReadWithCoalescingSmallRegionWithEviction() throws Exception {
+        isHangOnWriteAll = false;
+
+        isSmallRegion = true;
+
+        readWithCoalescing();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testReadWithCoalescingNormalRegion() throws Exception {
+        isHangOnWriteAll = false;
+
+        isSmallRegion = false;
+
+        readWithCoalescing();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testUpdateAndReadTheSameKeyWithCoalescing() throws Exception {
+        isHangOnWriteAll = false;
+
+        isSmallRegion = false;
+
+        updateAndReadWithCoalescingSameKey();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testUpdateAndReadTheSameKeyWithCoalescingHangWriteAll() throws Exception {
+        isHangOnWriteAll = true;
+
+        isSmallRegion = false;
+
+        updateAndReadWithCoalescingSameKey();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void updateAndReadWithCoalescingSameKey() throws Exception {
+        Ignite ignite = startGrid(0);
+
+        ignite.cluster().active(true);
+
+        IgniteCache<Integer, TestPojo> cache = grid(0).cache("TEST_CACHE");
+
+        AtomicInteger t1Count = new AtomicInteger(5);
+
+        AtomicInteger t2Count = new AtomicInteger(5);
+
+        Thread t1 = new Thread(new Runnable() {
+            @Override public void run() {
+                try {
+                    while (t1Count.get() > 0) {
+                        for (int i = 0; i < 200000; i++) {
+                            TestPojo next = new TestPojo("ORIGIN" + i, i, new java.sql.Date(new java.util.Date().getTime()));
+
+                            cache.put(1, next);
+
+                            TestPojo ret = cache.get(1);
+
+                            assertEquals(ret, next);
+                        }
+
+                        t1Count.decrementAndGet();
+                    }
+                } catch (CacheException e) {
+                    //ignore
+                }
+            }
+        });
+
+        Thread t2 = new Thread(new Runnable() {
+            @Override public void run() {
+                try {
+                    while (t2Count.get() > 0) {
+                        for (int i = 200000; i < 400000; i++) {
+                            TestPojo next = new TestPojo("ORIGIN" + i, i, new java.sql.Date(new java.util.Date().getTime()));
+
+                            cache.put(2, next);
+
+                            TestPojo ret = cache.get(2);
+
+                            assertEquals(ret, next);
+                        }
+
+                        t2Count.decrementAndGet();
+                    }
+                } catch (CacheException e) {
+                    //ignore
+                }
+            }
+        });
+
+        TestErrorHandler handler = new TestErrorHandler();
+
+        t1.setUncaughtExceptionHandler(handler);
+
+        t2.setUncaughtExceptionHandler(handler);
+
+        t1.start();
+
+        t2.start();
+
+        t1.join();
+
+        t2.join();
+
+        assertFalse(testFailed.get());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void readWithCoalescing() throws Exception {
+        Ignite ignite = startGrid(0);
+
+        ignite.cluster().active(true);
+
+        IgniteCache<Integer, TestPojo> cache = grid(0).cache("TEST_CACHE");
+
+        AtomicInteger t1Count = new AtomicInteger(5);
+
+        AtomicInteger t2Count = new AtomicInteger(5);
+
+        Thread t1 = new Thread(new Runnable() {
+            @Override public void run() {
+                try {
+                    while (t1Count.get() > 0) {
+                        for (int i = 0; i < 200000; i++) {
+                            TestPojo next = new TestPojo("ORIGIN" + i, i, new java.sql.Date(new java.util.Date().getTime()));
+
+                            cache.put(i, next);
+
+                            TestPojo ret = cache.get(i);
+
+                            assertEquals(ret, next);
+                        }
+
+                        t1Count.decrementAndGet();
+                    }
+                } catch (CacheException e) {
+                    //ignore
+                }
+            }
+        });
+
+        Thread t2 = new Thread(new Runnable() {
+            @Override public void run() {
+                try {
+                    while (t2Count.get() > 0) {
+                        for (int i = 200000; i < 400000; i++) {
+                            TestPojo next = new TestPojo("ORIGIN" + i, i, new java.sql.Date(new java.util.Date().getTime()));
+
+                            cache.put(i, next);
+
+                            TestPojo ret = cache.get(i);
+
+                            assertEquals(ret, next);
+                        }
+
+                        t2Count.decrementAndGet();
+                    }
+                } catch (CacheException e) {
+                    //ignore
+                }
+            }
+        });
+
+        TestErrorHandler handler = new TestErrorHandler();
+
+        t1.setUncaughtExceptionHandler(handler);
+
+        t2.setUncaughtExceptionHandler(handler);
+
+        t1.start();
+
+        t2.start();
+
+        t1.join();
+
+        t2.join();
+
+        assertFalse(testFailed.get());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void writeAllWithCoalescing() throws Exception {
+        Ignite ignite = startGrid(0);
+
+        ignite.cluster().active(true);
+
+        IgniteCache<Integer, TestPojo> cache = grid(0).cache("TEST_CACHE");
+
+        AtomicInteger t1Count = new AtomicInteger(10);
+
+        AtomicInteger t2Count = new AtomicInteger(10);
+
+        Thread t1 = new Thread(new Runnable() {
+            @Override public void run() {
+                try {
+                    while (t1Count.get() > 0) {
+                        for (int i = 0; i < 5000; i++)
+                            cache.put(i, new TestPojo("ORIGIN" + i, i, new java.sql.Date(new java.util.Date().getTime())));
+
+                        t1Count.decrementAndGet();
+                    }
+                } catch (CacheException e) {
+                    //ignore
+                }
+            }
+        });
+
+        Thread t2 = new Thread(new Runnable() {
+            @Override public void run() {
+                try {
+                    while (t2Count.get() > 0) {
+                        for (int i = 0; i < 5000; i++)
+                            cache.put(i, new TestPojo("UPDATE" + i, i, new java.sql.Date(new java.util.Date().getTime())));
+
+                        try {
+                            U.sleep(500);
+                        }
+                        catch (IgniteInterruptedCheckedException e) {
+                            e.printStackTrace();
+                        }
+
+                        t2Count.decrementAndGet();
+                    }
+                } catch (CacheException e) {
+                    //ignore
+                }
+            }
+        });
+
+        t1.start();
+
+        t2.start();
+
+        //t1 should be completed before 10 seconds.
+        U.sleep(10_000);
+
+        assertEquals(0, t1Count.get());
+
+        t1.join();
+
+        t2.join();
+
+        assertEquals(0, t2Count.get());
+
+        //now wait for updates will be done on store size and check that the data set is the same
+        if (isHangOnWriteAll)
+            //max time -> 10000 updates that will be send by 1000 batches -> (10000 / 1000) * 10 = 100 seconds.
+            U.sleep(100_000);
+        else
+            U.sleep(10_000);
+
+        checkCacheStore(cache);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private class TestErrorHandler implements Thread.UncaughtExceptionHandler {
+        /** {@inheritDoc} */
+        @Override public void uncaughtException(Thread t, Throwable e) {
+            testFailed.set(true);
+        }
+    }
+}
+
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/TestJdbcPojoDataSource.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/TestJdbcPojoDataSource.java
new file mode 100644
index 000000000000..627b4a36fb5b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/TestJdbcPojoDataSource.java
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.jdbc.model;
+
+import java.io.PrintWriter;
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.CallableStatement;
+import java.sql.Clob;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.NClob;
+import java.sql.PreparedStatement;
+import java.sql.SQLClientInfoException;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
+import java.sql.Savepoint;
+import java.sql.Statement;
+import java.sql.Struct;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executor;
+import java.util.logging.Logger;
+import javax.sql.DataSource;
+
+/**
+ * Test JDBC POJO DataSource.
+ */
+public class TestJdbcPojoDataSource implements DataSource {
+    /** */
+    private final ThreadLocal<ConnectionHolder> holder = new ThreadLocal<ConnectionHolder>() {
+        @Override
+        protected ConnectionHolder initialValue() {
+            return new ConnectionHolder();
+        }
+    };
+
+    /** */
+    private volatile boolean perThreadMode = true;
+
+    /** */
+    private String url;
+
+    /** */
+    private String username;
+
+    /** */
+    private String password;
+
+    /** */
+    private long idleCheckTimeout = 30_000;
+
+    /** */
+    public void setDriverClassName(String driverClassName) {
+        try {
+            Class.forName(driverClassName);
+        }
+        catch (ClassNotFoundException e) {
+            throw new IllegalStateException("JDBC driver class not found: " + driverClassName, e);
+        }
+    }
+
+    /** */
+    public void setUrl(String url) {
+        this.url = url;
+    }
+
+    /** */
+    public void setUsername(String username) {
+        this.username = username;
+    }
+
+    /** */
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    /** */
+    public void setIdleCheckTimeout(long idleCheckTimeout) {
+        this.idleCheckTimeout = idleCheckTimeout;
+    }
+
+    /** */
+    void switchPerThreadMode(boolean perThreadMode) {
+        this.perThreadMode = perThreadMode;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Connection getConnection() throws SQLException {
+        if (perThreadMode) {
+            ConnectionHolder holder = this.holder.get();
+
+            holder.initIfNeeded();
+
+            return holder;
+        }
+        else
+            return createConnection();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Connection getConnection(String username, String password) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void setLoginTimeout(int seconds) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int getLoginTimeout() throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <T> T unwrap(Class<T> iface) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean isWrapperFor(Class<?> iface) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public PrintWriter getLogWriter() throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void setLogWriter(PrintWriter out) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Logger getParentLogger() throws SQLFeatureNotSupportedException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    /** {@inheritDoc} */
+    private Connection createConnection() throws SQLException {
+        return DriverManager.getConnection(url, username, password);
+    }
+
+    /** {@inheritDoc} */
+    private class ConnectionHolder implements Connection {
+        /** */
+        private Connection conn;
+
+        /** */
+        private long lastAccessTs = System.currentTimeMillis();
+
+        /** */
+        void initIfNeeded() throws SQLException {
+            if (conn == null || conn.isClosed())
+                conn = createConnection();
+            else if (System.currentTimeMillis() - lastAccessTs > idleCheckTimeout && !conn.isValid(5_000)) {
+                conn.close();
+
+                conn = createConnection();
+            }
+
+            lastAccessTs = System.currentTimeMillis();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void close() throws SQLException {
+            if (!perThreadMode)
+                conn.close();
+        }
+
+        // --- Delegation. ---
+
+        /** {@inheritDoc} */
+        @Override
+        public boolean isClosed() throws SQLException {
+            return conn.isClosed();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public Statement createStatement() throws SQLException {
+            return conn.createStatement();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public PreparedStatement prepareStatement(String sql) throws SQLException {
+            return conn.prepareStatement(sql);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public CallableStatement prepareCall(String sql) throws SQLException {
+            return conn.prepareCall(sql);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public String nativeSQL(String sql) throws SQLException {
+            return conn.nativeSQL(sql);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void setAutoCommit(boolean autoCommit) throws SQLException {
+            conn.setAutoCommit(autoCommit);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public boolean getAutoCommit() throws SQLException {
+            return conn.getAutoCommit();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void commit() throws SQLException {
+            conn.commit();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void rollback() throws SQLException {
+            conn.rollback();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public DatabaseMetaData getMetaData() throws SQLException {
+            return conn.getMetaData();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void setReadOnly(boolean readOnly) throws SQLException {
+            conn.setReadOnly(readOnly);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public boolean isReadOnly() throws SQLException {
+            return conn.isReadOnly();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void setCatalog(String catalog) throws SQLException {
+            conn.setCatalog(catalog);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public String getCatalog() throws SQLException {
+            return conn.getCatalog();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void setTransactionIsolation(int level) throws SQLException {
+            conn.setTransactionIsolation(level);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public int getTransactionIsolation() throws SQLException {
+            return conn.getTransactionIsolation();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public SQLWarning getWarnings() throws SQLException {
+            return conn.getWarnings();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void clearWarnings() throws SQLException {
+            conn.clearWarnings();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
+            return conn.createStatement(resultSetType, resultSetConcurrency);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
+            return conn.prepareStatement(sql, resultSetType, resultSetConcurrency);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
+            return conn.prepareCall(sql, resultSetType, resultSetConcurrency);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public Map<String, Class<?>> getTypeMap() throws SQLException {
+            return conn.getTypeMap();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
+            conn.setTypeMap(map);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void setHoldability(int holdability) throws SQLException {
+            conn.setHoldability(holdability);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public int getHoldability() throws SQLException {
+            return conn.getHoldability();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public Savepoint setSavepoint() throws SQLException {
+            return conn.setSavepoint();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public Savepoint setSavepoint(String name) throws SQLException {
+            return conn.setSavepoint(name);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void rollback(Savepoint savepoint) throws SQLException {
+            conn.rollback(savepoint);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void releaseSavepoint(Savepoint savepoint) throws SQLException {
+            conn.releaseSavepoint(savepoint);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+            return conn.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+            return conn.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+            return conn.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
+            return conn.prepareStatement(sql, autoGeneratedKeys);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
+            return conn.prepareStatement(sql, columnIndexes);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
+            return conn.prepareStatement(sql, columnNames);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public Clob createClob() throws SQLException {
+            return conn.createClob();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public Blob createBlob() throws SQLException {
+            return conn.createBlob();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public NClob createNClob() throws SQLException {
+            return conn.createNClob();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public SQLXML createSQLXML() throws SQLException {
+            return conn.createSQLXML();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public boolean isValid(int timeout) throws SQLException {
+            return conn.isValid(timeout);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void setClientInfo(String name, String value) throws SQLClientInfoException {
+            conn.setClientInfo(name, value);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void setClientInfo(Properties properties) throws SQLClientInfoException {
+            conn.setClientInfo(properties);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public String getClientInfo(String name) throws SQLException {
+            return conn.getClientInfo(name);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public Properties getClientInfo() throws SQLException {
+            return conn.getClientInfo();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
+            return conn.createArrayOf(typeName, elements);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
+            return conn.createStruct(typeName, attributes);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void setSchema(String schema) throws SQLException {
+            conn.setSchema(schema);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public String getSchema() throws SQLException {
+            return conn.getSchema();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void abort(Executor executor) throws SQLException {
+            conn.abort(executor);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
+            conn.setNetworkTimeout(executor, milliseconds);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public int getNetworkTimeout() throws SQLException {
+            return conn.getNetworkTimeout();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public <T> T unwrap(Class<T> iface) throws SQLException {
+            return conn.unwrap(iface);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public boolean isWrapperFor(Class<?> iface) throws SQLException {
+            return conn.isWrapperFor(iface);
+        }
+    }
+}
+
+
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/TestJdbcPojoDataSourceFactory.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/TestJdbcPojoDataSourceFactory.java
new file mode 100644
index 000000000000..340751faec36
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/TestJdbcPojoDataSourceFactory.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.jdbc.model;
+
+import java.util.Objects;
+import javax.cache.configuration.Factory;
+import javax.sql.DataSource;
+
+/**
+ * Test JDBC POJO DataSource factory.
+ */
+public class TestJdbcPojoDataSourceFactory implements Factory<DataSource> {
+    /** */
+    private String URL;
+
+    /** */
+    private String userName;
+
+    /** */
+    private String password;
+
+    /** {@inheritDoc} */
+    @Override public DataSource create() {
+        TestJdbcPojoDataSource ds = new TestJdbcPojoDataSource();
+
+        ds.setUrl("jdbc:h2:mem:TestDatabase;DB_CLOSE_DELAY=-1");
+
+        ds.setUsername("sa");
+
+        ds.setPassword("");
+
+        return ds;
+    }
+
+    /** */
+    public String getURL() {
+        return URL;
+    }
+
+    /** */
+    public void setURL(String URL) {
+        this.URL = URL;
+    }
+
+    /** */
+    public String getUserName() {
+        return userName;
+    }
+
+    /** */
+    public void setUserName(String userName) {
+        this.userName = userName;
+    }
+
+    /** */
+    public String getPassword() {
+        return password;
+    }
+
+    /** */
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+        TestJdbcPojoDataSourceFactory factory = (TestJdbcPojoDataSourceFactory)o;
+        return Objects.equals(URL, factory.URL) &&
+            Objects.equals(userName, factory.userName) &&
+            Objects.equals(password, factory.password);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+
+        return Objects.hash(URL, userName, password);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "TestJdbcPojoDataSourceFactory{" +
+            "URL='" + URL + '\'' +
+            ", userName='" + userName + '\'' +
+            ", password='" + password + '\'' +
+            '}';
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/TestJdbcPojoStoreFactoryWithHangWriteAll.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/TestJdbcPojoStoreFactoryWithHangWriteAll.java
new file mode 100644
index 000000000000..ebe93f334073
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/TestJdbcPojoStoreFactoryWithHangWriteAll.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.jdbc.model;
+
+import java.sql.PreparedStatement;
+import java.util.Collection;
+import javax.cache.Cache;
+import javax.cache.CacheException;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import javax.sql.DataSource;
+import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStore;
+import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreFactory;
+import org.apache.ignite.cache.store.jdbc.JdbcTypeField;
+import org.apache.ignite.lang.IgniteBiInClosure;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Test JDBC POJO Store Factory With Hang WriteAll Method..
+ */
+public class TestJdbcPojoStoreFactoryWithHangWriteAll<K, V> extends CacheJdbcPojoStoreFactory<K, V> {
+    /** */
+    private static long count = 0;
+
+    /** {@inheritDoc} */
+    @Override public CacheJdbcPojoStore<K, V> create() {
+        CacheJdbcPojoStore<K, V> store = new TestJdbcPojoStoreWithHangWriteAll<>();
+
+        store.setBatchSize(getBatchSize());
+        store.setDialect(getDialect());
+        store.setMaximumPoolSize(getMaximumPoolSize());
+        store.setMaximumWriteAttempts(getMaximumWriteAttempts());
+        store.setParallelLoadCacheMinimumThreshold(getParallelLoadCacheMinimumThreshold());
+        store.setTypes(getTypes());
+        store.setHasher(getHasher());
+        store.setTransformer(getTransformer());
+        store.setSqlEscapeAll(isSqlEscapeAll());
+        store.setDataSource(getDataSourceFactory().create());
+
+        return store;
+    }
+
+    /** */
+    public static class TestJdbcPojoStoreWithHangWriteAll<K,V> extends CacheJdbcPojoStore<K,V> {
+        /** {@inheritDoc} */
+        @Override protected void fillParameter(PreparedStatement stmt, int idx, JdbcTypeField field, @Nullable Object fieldVal) throws CacheException {
+            try {
+                super.fillParameter(stmt, idx, field, fieldVal);
+            }
+            catch (Exception e) {
+                log.error("Failed to fill parameter [idx=" + idx + ", field=" + field + ", val=" + fieldVal + ']', e);
+
+                throw e;
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void loadCache(IgniteBiInClosure<K, V> clo, @Nullable Object... args) throws CacheLoaderException {
+            DataSource ds = getDataSource();
+
+            try {
+                if (ds instanceof TestJdbcPojoDataSource)
+                    ((TestJdbcPojoDataSource)ds).switchPerThreadMode(false);
+
+                super.loadCache(clo, args);
+            }
+            finally {
+                if (ds instanceof TestJdbcPojoDataSource)
+                    ((TestJdbcPojoDataSource)ds).switchPerThreadMode(true);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void delete(Object key) throws CacheWriterException {
+            try {
+                super.delete(key);
+            }
+            catch (Exception e) {
+                log.error("Failed to delete entry from cache store: " + key, e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void deleteAll(Collection<?> keys) throws CacheWriterException {
+            try {
+                super.deleteAll(keys);
+            }
+            catch (Exception e) {
+                log.error("Failed to delete entries from cache store: " + keys, e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(Cache.Entry<? extends K, ? extends V> entry) throws CacheWriterException {
+            try {
+                super.write(entry);
+            }
+            catch (Exception e) {
+                log.error("Failed to write entry to cache store: " + entry, e);
+            }
+
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeAll(Collection<Cache.Entry<? extends K, ? extends V>> entries) throws CacheWriterException {
+            try {
+                super.writeAll(entries);
+
+                Thread.sleep(10000);
+
+                count += entries.size();
+
+                log.info("Count of load data: " + count);
+            }
+            catch (Exception e) {
+                log.error("Failed to write entries to cache store: " + entries, e);
+            }
+        }
+    }
+}
+
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/TestPojo.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/TestPojo.java
new file mode 100644
index 000000000000..042362990277
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/TestPojo.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.jdbc.model;
+
+import java.io.Serializable;
+import java.util.Objects;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import java.sql.Date;
+
+/**
+ * Test JDBC POJO object.
+ */
+public class TestPojo implements Serializable {
+    /** */
+    @QuerySqlField
+    private String value1;
+
+    /** */
+    @QuerySqlField
+    private Integer value2;
+
+    /** */
+    @QuerySqlField
+    private Date value3;
+
+    /**
+     * Default constructor.
+     */
+    public TestPojo() {
+        // No-op.
+    }
+
+    /** */
+    public TestPojo(String value1, int value2, Date value3) {
+        this.value1 = value1;
+
+        this.value2 = value2;
+
+        this.value3 = value3;
+    }
+
+    /** */
+    public String getValue1() {
+        return value1;
+    }
+
+    /** */
+    public void setValue1(String value1) {
+        this.value1 = value1;
+    }
+
+    /** */
+    public Integer getValue2() {
+        return value2;
+    }
+
+    /** */
+    public void setValue2(Integer value2) {
+        this.value2 = value2;
+    }
+
+    /** */
+    public Date getValue3() {
+        return value3;
+    }
+
+    /** */
+    public void setValue3(Date value3) {
+        this.value3 = value3;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        TestPojo pogo = (TestPojo)o;
+
+        return Objects.equals(value1, pogo.value1) &&
+            Objects.equals(value2, pogo.value2) &&
+            Objects.equals(value3, pogo.value3);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+
+        return Objects.hash(value1, value2, value3);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "TestPojo{" +
+            "value1='" + value1 + '\'' +
+            ", value2=" + value2 +
+            ", value3=" + value3 +
+            '}';
+    }
+}
+
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreSelfTest.java
index b242ac97d602..46adbdefce60 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreSelfTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.store;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -168,12 +169,15 @@ private void testSimpleStore(boolean writeCoalescing) throws Exception {
             assertEquals("v1", store.load(1));
             assertEquals("v2", store.load(2));
             assertNull(store.load(3));
+            assertEquals(store.loadAll(Arrays.asList(3, 4, 5)).size(), 0);
 
             store.delete(1);
 
             assertNull(store.load(1));
+            assertEquals(store.loadAll(Arrays.asList(1)).size(), 0);
             assertEquals("v2", store.load(2));
             assertNull(store.load(3));
+            assertEquals(store.loadAll(Arrays.asList(3)).size(), 0);
         }
         finally {
             shutdownStore();
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 07d26aa06a95..dd773e1acbaf 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -35,6 +35,7 @@
 import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreBinaryMarshallerWithSqlEscapeSelfTest;
 import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreMultitreadedSelfTest;
 import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreTest;
+import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoWriteBehindStoreWithCoalescingTest;
 import org.apache.ignite.cache.store.jdbc.GridCacheJdbcBlobStoreMultithreadedSelfTest;
 import org.apache.ignite.cache.store.jdbc.GridCacheJdbcBlobStoreSelfTest;
 import org.apache.ignite.cache.store.jdbc.JdbcTypesDefaultTransformerTest;
@@ -243,6 +244,7 @@ public static TestSuite suite(Collection<Class> ignoredTests) {
         GridTestUtils.addTestIfNeeded(suite, CacheJdbcPojoStoreBinaryMarshallerWithSqlEscapeSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, CacheJdbcPojoStoreBinaryMarshallerStoreKeepBinaryWithSqlEscapeSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, CacheJdbcPojoStoreMultitreadedSelfTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, CacheJdbcPojoWriteBehindStoreWithCoalescingTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, GridCacheBalancingStoreSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, GridCacheAffinityApiSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, GridCacheStoreValueBytesSelfTest.class, ignoredTests);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services