You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by il...@apache.org on 2019/01/09 13:46:21 UTC
[ignite] branch master updated: IGNITE-5003 Fixed hanging parallel
write&evict in CacheWriteBehindStore - Fixes #5664.
This is an automated email from the ASF dual-hosted git repository.
ilyak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 7e9fe14 IGNITE-5003 Fixed hanging parallel write&evict in CacheWriteBehindStore - Fixes #5664.
7e9fe14 is described below
commit 7e9fe14a31d743b3d314b7d00e23d971767659fe
Author: Andrei Aleksandrov <ae...@gmail.com>
AuthorDate: Wed Jan 9 16:44:36 2019 +0300
IGNITE-5003 Fixed hanging parallel write&evict in CacheWriteBehindStore - Fixes #5664.
Signed-off-by: Ilya Kasnacheev <il...@gmail.com>
---
.../cache/store/GridCacheWriteBehindStore.java | 149 ++++-
...JdbcPojoWriteBehindStoreWithCoalescingTest.java | 680 +++++++++++++++++++++
.../store/jdbc/model/TestJdbcPojoDataSource.java | 523 ++++++++++++++++
.../jdbc/model/TestJdbcPojoDataSourceFactory.java | 106 ++++
.../TestJdbcPojoStoreFactoryWithHangWriteAll.java | 136 +++++
.../ignite/cache/store/jdbc/model/TestPojo.java | 117 ++++
.../store/GridCacheWriteBehindStoreSelfTest.java | 4 +
.../ignite/testsuites/IgniteCacheTestSuite.java | 2 +
8 files changed, 1692 insertions(+), 25 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
index fdf3649..c64dbc8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
@@ -439,10 +439,24 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
val.readLock().lock();
try {
- if (val.operation() == StoreOperation.PUT)
- loaded.put(key, val.entry().getValue());
+ StoreOperation op;
+
+ V value;
+
+ if (writeCoalescing && val.nextOperation() != null) {
+ op = val.nextOperation();
+
+ value = (op == StoreOperation.PUT) ? val.nextEntry().getValue() : null;
+ } else {
+ op = val.operation();
+
+ value = (op == StoreOperation.PUT) ? val.entry().getValue() : null;
+ }
+
+ if (op == StoreOperation.PUT)
+ loaded.put(key, value);
else
- assert val.operation() == StoreOperation.RMV : val.operation();
+ assert op == StoreOperation.RMV : op;
}
finally {
val.readLock().unlock();
@@ -484,9 +498,23 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
val.readLock().lock();
try {
- switch (val.operation()) {
+ StoreOperation op;
+
+ V value;
+
+ if (writeCoalescing && val.nextOperation() != null) {
+ op = val.nextOperation();
+
+ value = (op == StoreOperation.PUT) ? val.nextEntry().getValue() : null;
+ } else {
+ op = val.operation();
+
+ value = (op == StoreOperation.PUT) ? val.entry().getValue() : null;
+ }
+
+ switch (op) {
case PUT:
- return val.entry().getValue();
+ return value;
case RMV:
return null;
@@ -593,11 +621,14 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
prev.writeLock().lock();
try {
- if (prev.status() == ValueStatus.PENDING) {
- // Flush process in progress, try again.
- prev.waitForFlush();
+ if (prev.status() == ValueStatus.PENDING || prev.status() == ValueStatus.PENDING_AND_UPDATED) {
+ // Flush process in progress, save next value and update the status.
- continue;
+ prev.setNext(newVal.val, newVal.storeOperation);
+
+ prev.status(ValueStatus.PENDING_AND_UPDATED);
+
+ break;
}
else if (prev.status() == ValueStatus.FLUSHED)
// This entry was deleted from map before we acquired the lock.
@@ -712,14 +743,22 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
Map<K, Entry<? extends K, ? extends V>> batch = U.newLinkedHashMap(valMap.size());
for (Map.Entry<K, StatefulValue<K, V>> e : valMap.entrySet()) {
- if (operation == null)
- operation = e.getValue().operation();
+ StatefulValue<K, V> val = e.getValue();
- assert operation == e.getValue().operation();
+ val.readLock().lock();
- assert e.getValue().status() == ValueStatus.PENDING;
+ try {
+ if (operation == null)
+ operation = val.operation();
+
+ assert operation == val.operation();
- batch.put(e.getKey(), e.getValue().entry());
+ assert val.status() == ValueStatus.PENDING || val.status() == ValueStatus.PENDING_AND_UPDATED;
+
+ batch.put(e.getKey(), val.entry());
+ } finally {
+ val.readLock().unlock();
+ }
}
boolean result = updateStore(operation, batch, initSes, flusher);
@@ -731,17 +770,26 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
val.writeLock().lock();
try {
- val.status(ValueStatus.FLUSHED);
-
if (writeCoalescing) {
- StatefulValue<K, V> prev = writeCache.remove(e.getKey());
+ if (val.status() == ValueStatus.PENDING_AND_UPDATED) {
+ val.update(val.nextEntry(), val.nextOperation(), ValueStatus.NEW);
+
+ val.setNext(null, null);
+ }
+ else {
+ val.status(ValueStatus.FLUSHED);
- // Additional check to ensure consistency.
- assert prev == val : "Map value for key " + e.getKey() + " was updated during flush";
+ StatefulValue<K, V> prev = writeCache.remove(e.getKey());
+
+ // Additional check to ensure consistency.
+ assert prev == val : "Map value for key " + e.getKey() + " was updated during flush";
+ }
val.signalFlushed();
}
else {
+ val.status(ValueStatus.FLUSHED);
+
Flusher f = flusher(e.getKey());
// Can remove using equal because if map contains another similar value it has different state.
@@ -761,9 +809,16 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
val.writeLock().lock();
try {
- val.status(ValueStatus.RETRY);
+ if (val.status() == ValueStatus.PENDING_AND_UPDATED) {
+ val.update(val.nextEntry(), val.nextOperation(), ValueStatus.NEW);
- retryEntriesCnt.incrementAndGet();
+ val.setNext(null, null);
+ }
+ else {
+ val.status(ValueStatus.RETRY);
+
+ retryEntriesCnt.incrementAndGet();
+ }
val.signalFlushed();
}
@@ -1116,13 +1171,17 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
switch (addRes) {
case NEW_BATCH:
+ // No need to test first value in batch
+ val.status(ValueStatus.PENDING);
+
+ val.writeLock().unlock();
+
applyBatch(pending, true, null);
pending = U.newLinkedHashMap(batchSize);
- // No need to test first value in batch
- val.status(ValueStatus.PENDING);
pending.put(e.getKey(), val);
+
prevOperation = val.operation();
break;
@@ -1137,7 +1196,8 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
}
}
finally {
- val.writeLock().unlock();
+ if (val.writeLock().isHeldByCurrentThread())
+ val.writeLock().unlock();
}
}
@@ -1307,6 +1367,12 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
/** Value is captured by flusher and store operation is performed at the moment. */
PENDING,
+ /**
+ * Value is captured by flusher and store operation is performed at the moment.
+ * New update for the key was stored and waiting for previous store operation.
+ */
+ PENDING_AND_UPDATED,
+
/** Store operation has failed and it will be re-tried at the next flush. */
RETRY,
@@ -1335,7 +1401,7 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
* @return {@code true} if status indicates any pending or complete store update operation.
*/
private boolean acquired(ValueStatus status) {
- return status == ValueStatus.PENDING || status == ValueStatus.FLUSHED;
+ return status == ValueStatus.PENDING || status == ValueStatus.FLUSHED || status == ValueStatus.PENDING_AND_UPDATED;
}
/**
@@ -1352,9 +1418,16 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
@GridToStringInclude(sensitive = true)
private Entry<? extends K, ? extends V> val;
+ /** Next value that waiting for a previous store operation. */
+ @GridToStringInclude(sensitive = true)
+ private Entry<? extends K, ? extends V> nextVal;
+
/** Store operation. */
private StoreOperation storeOperation;
+ /** Next store operation. */
+ private StoreOperation nextStoreOperation;
+
/** Value status. */
private ValueStatus valStatus;
@@ -1376,6 +1449,20 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
}
/**
+ * @return Next stored value.
+ */
+ private Entry<? extends K, ? extends V> nextEntry() {
+ return nextVal;
+ }
+
+ /**
+ * @return Store operation.
+ */
+ private StoreOperation nextOperation() {
+ return nextStoreOperation;
+ }
+
+ /**
* @return Stored value.
*/
private Entry<? extends K, ? extends V> entry() {
@@ -1421,6 +1508,18 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
}
/**
+ * Added next value that waiting for a previous store operation.
+ *
+ * @param val Value.
+ * @param storeOperation Store operation.
+ */
+ private void setNext(@Nullable Entry<? extends K, ? extends V> val,
+ StoreOperation storeOperation) {
+ this.nextVal = val;
+ this.nextStoreOperation = storeOperation;
+ }
+
+ /**
* Awaits a signal on flush condition.
*
* @throws IgniteInterruptedCheckedException If thread was interrupted.
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoWriteBehindStoreWithCoalescingTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoWriteBehindStoreWithCoalescingTest.java
new file mode 100644
index 0000000..cbcc22f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoWriteBehindStoreWithCoalescingTest.java
@@ -0,0 +1,680 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.jdbc;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.CacheException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.store.jdbc.model.TestJdbcPojoDataSourceFactory;
+import org.apache.ignite.cache.store.jdbc.model.TestJdbcPojoStoreFactoryWithHangWriteAll;
+import org.apache.ignite.cache.store.jdbc.model.TestPojo;
+import org.apache.ignite.cache.store.jdbc.dialect.H2Dialect;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_SAFE;
+import static org.apache.ignite.configuration.DataPageEvictionMode.RANDOM_LRU;
+import static org.apache.ignite.configuration.WALMode.LOG_ONLY;
+
+/**
+ * Tests for {@link CacheJdbcPojoStore}.
+ */
+@RunWith(JUnit4.class)
+public class CacheJdbcPojoWriteBehindStoreWithCoalescingTest extends GridCommonAbstractTest {
+ /** */
+ private static final String DFLT_CONN_URL = "jdbc:h2:mem:TestDatabase;DB_CLOSE_DELAY=-1";
+
+ /** */
+ private boolean isHangOnWriteAll = false;
+
+ /** */
+ private boolean isSmallRegion = false;
+
+ /** */
+ private final AtomicBoolean testFailed = new AtomicBoolean(false);
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 20 * 60 * 1000; //20 min
+ }
+
+ /** */
+ public DataStorageConfiguration getDataStorageConfiguration() {
+ DataStorageConfiguration memCfg = new DataStorageConfiguration();
+
+ DataRegionConfiguration plc = new DataRegionConfiguration();
+
+ plc.setName("Default_Region");
+
+ plc.setPageEvictionMode(RANDOM_LRU);
+
+ if (isSmallRegion)
+ plc.setMaxSize(128L * 1024 * 1024); // 128 MB
+ else
+ plc.setMaxSize(1L * 1024 * 1024 * 1024); // 1GB
+
+ memCfg.setDefaultDataRegionConfiguration(plc);
+
+ memCfg.setWalMode(LOG_ONLY);
+
+ return memCfg;
+ }
+
+ /** */
+ public TestJdbcPojoDataSourceFactory getDataSourceFactory() {
+ TestJdbcPojoDataSourceFactory testJdbcPojoDataSourceFactory = new TestJdbcPojoDataSourceFactory();
+
+ testJdbcPojoDataSourceFactory.setURL("jdbc:h2:mem:TestDatabase;DB_CLOSE_DELAY=-1");
+
+ testJdbcPojoDataSourceFactory.setUserName("sa");
+
+ testJdbcPojoDataSourceFactory.setPassword("");
+
+ return testJdbcPojoDataSourceFactory;
+ }
+
+ /** */
+ public JdbcType getJdbcType() {
+ JdbcType type = new JdbcType();
+
+ type.setCacheName("TEST_CACHE");
+
+ type.setKeyType(Integer.class);
+
+ type.setValueType(TestPojo.class);
+
+ type.setDatabaseSchema("PUBLIC");
+
+ type.setDatabaseTable("TEST_CACHE");
+
+ type.setKeyFields(new JdbcTypeField(java.sql.Types.INTEGER, "VALUE2", Integer.class, "value2"));
+
+ type.setValueFields(
+ new JdbcTypeField(java.sql.Types.VARCHAR, "VALUE1", String.class, "value1"),
+ new JdbcTypeField(java.sql.Types.DATE, "VALUE3", java.sql.Date.class, "value3")
+ );
+
+ return type;
+ }
+
+ /** */
+ public CacheJdbcPojoStoreFactory getStoreFactory() {
+ CacheJdbcPojoStoreFactory storeFactory = new CacheJdbcPojoStoreFactory();
+
+ storeFactory.setParallelLoadCacheMinimumThreshold(100);
+
+ storeFactory.setBatchSize(100);
+
+ storeFactory.setMaximumPoolSize(4);
+
+ storeFactory.setDataSourceFactory(getDataSourceFactory());
+
+ storeFactory.setDialect(new H2Dialect());
+
+ storeFactory.setTypes(getJdbcType());
+
+ return storeFactory;
+ }
+
+ /** */
+ public CacheJdbcPojoStoreFactory getStoreFactoryWithHangWriteAll() {
+ TestJdbcPojoStoreFactoryWithHangWriteAll storeFactory = new TestJdbcPojoStoreFactoryWithHangWriteAll();
+
+ storeFactory.setParallelLoadCacheMinimumThreshold(100);
+
+ storeFactory.setBatchSize(100);
+
+ storeFactory.setMaximumPoolSize(4);
+
+ storeFactory.setDataSourceFactory(getDataSourceFactory());
+
+ storeFactory.setDialect(new H2Dialect());
+
+ storeFactory.setTypes(getJdbcType());
+
+ return storeFactory;
+ }
+
+ /** */
+ public CacheConfiguration getCacheConfiguration() {
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setName("TEST_CACHE");
+
+ ccfg.setCacheMode(REPLICATED);
+
+ ccfg.setAtomicityMode(ATOMIC);
+
+ ccfg.setPartitionLossPolicy(READ_WRITE_SAFE);
+
+ ccfg.setReadThrough(true);
+
+ ccfg.setWriteThrough(true);
+
+ ccfg.setWriteBehindEnabled(true);
+
+ ccfg.setWriteBehindBatchSize(1000);
+
+ QueryEntity queryEntity = new QueryEntity();
+
+ queryEntity.setKeyType("java.lang.Integer");
+
+ queryEntity.setValueType("org.apache.ignite.cache.store.jdbc.model.TestPojo");
+
+ queryEntity.setTableName("TEST_CACHE");
+
+ queryEntity.setKeyFieldName("value3");
+
+ Set<String> keyFiles = new HashSet<>();
+
+ keyFiles.add("value3");
+
+ queryEntity.setKeyFields(keyFiles);
+
+ LinkedHashMap<String, String> fields = new LinkedHashMap<>();
+
+ fields.put("value1", "java.lang.String");
+
+ fields.put("value2", "java.lang.Integer");
+
+ fields.put("value3", "java.sql.Date");
+
+ queryEntity.setFields(fields);
+
+ Map<String, String> aliases = new HashMap<>();
+
+ aliases.put("value1", "VALUE1");
+
+ aliases.put("value2", "VALUE2");
+
+ aliases.put("value3", "VALUE3");
+
+ queryEntity.setAliases(aliases);
+
+ ArrayList<QueryEntity> queryEntities = new ArrayList<>();
+
+ queryEntities.add(queryEntity);
+
+ ccfg.setQueryEntities(queryEntities);
+
+ return ccfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ //Data Storage
+
+ cfg.setDataStorageConfiguration(getDataStorageConfiguration());
+
+ //cache configuration
+
+ CacheConfiguration ccfg = getCacheConfiguration();
+
+ if (isHangOnWriteAll)
+ ccfg.setCacheStoreFactory(getStoreFactoryWithHangWriteAll());
+ else
+ ccfg.setCacheStoreFactory(getStoreFactory());
+
+ cfg.setCacheConfiguration(ccfg);
+
+ //discovery
+
+ TcpDiscoverySpi tcpDiscoverySpi = new TcpDiscoverySpi();
+
+ TcpDiscoveryVmIpFinder tcpDiscoveryVmIpFinder = new TcpDiscoveryVmIpFinder();
+
+ ArrayList<String> addrs = new ArrayList<>();
+
+ addrs.add("127.0.0.1:47500..47509");
+
+ tcpDiscoveryVmIpFinder.setAddresses(addrs);
+
+ tcpDiscoverySpi.setIpFinder(tcpDiscoveryVmIpFinder);
+
+ cfg.setDiscoverySpi(tcpDiscoverySpi);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ testFailed.set(false);
+
+ cleanPersistenceDir();
+
+ try {
+ Connection conn = DriverManager.getConnection(DFLT_CONN_URL, "sa", "");
+
+ Statement stmt = conn.createStatement();
+
+ stmt.executeUpdate("DROP TABLE IF EXISTS TEST_CACHE");
+
+ stmt.executeUpdate("CREATE TABLE TEST_CACHE (" +
+ " VALUE2 INTEGER PRIMARY KEY," +
+ " VALUE1 VARCHAR(50)," +
+ " VALUE3 DATE" +
+ ")"
+ );
+
+ conn.commit();
+
+ U.closeQuiet(stmt);
+
+ U.closeQuiet(conn);
+ } catch (SQLException ex) {
+ fail(ex.getMessage());
+ }
+
+ super.beforeTest();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /** */
+ public void checkCacheStore(IgniteCache<Integer, TestPojo> cache) {
+ try {
+ Connection conn = DriverManager.getConnection(DFLT_CONN_URL, "sa", "");
+
+ Statement stmt = conn.createStatement();
+
+ ResultSet rs = stmt.executeQuery(" SELECT * FROM TEST_CACHE");
+
+ int count = 0;
+
+ while (rs.next()) {
+ String value1 = rs.getString("VALUE1");
+
+ Integer value2 = rs.getInt("VALUE2");
+
+ java.sql.Date value3 = rs.getDate("VALUE3");
+
+ TestPojo pojo = cache.get(value2);
+
+ assertNotNull(pojo);
+
+ Calendar c1 = Calendar.getInstance();
+
+ c1.setTime(value3);
+
+ Calendar c2 = Calendar.getInstance();
+
+ c2.setTime(pojo.getValue3());
+
+ assertEquals(value1, pojo.getValue1());
+
+ assertEquals(value2, pojo.getValue2());
+
+ assertEquals(c1.get(Calendar.DAY_OF_YEAR), c2.get(Calendar.DAY_OF_YEAR));
+
+ assertEquals(c1.get(Calendar.YEAR), c2.get(Calendar.YEAR));
+
+ assertEquals(c1.get(Calendar.MONTH), c2.get(Calendar.MONTH));
+
+ count++;
+ }
+
+ assertEquals(count, cache.size());
+
+ U.closeQuiet(stmt);
+
+ U.closeQuiet(conn);
+ } catch (SQLException ex) {
+ fail();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testHangWriteAllWithCoalescing() throws Exception {
+ isHangOnWriteAll = true;
+
+ writeAllWithCoalescing();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testNormalWriteAllWithCoalescing() throws Exception {
+ isHangOnWriteAll = false;
+
+ writeAllWithCoalescing();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testReadWithCoalescingSmallRegionWithEviction() throws Exception {
+ isHangOnWriteAll = false;
+
+ isSmallRegion = true;
+
+ readWithCoalescing();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testReadWithCoalescingNormalRegion() throws Exception {
+ isHangOnWriteAll = false;
+
+ isSmallRegion = false;
+
+ readWithCoalescing();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testUpdateAndReadTheSameKeyWithCoalescing() throws Exception {
+ isHangOnWriteAll = false;
+
+ isSmallRegion = false;
+
+ updateAndReadWithCoalescingSameKey();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testUpdateAndReadTheSameKeyWithCoalescingHangWriteAll() throws Exception {
+ isHangOnWriteAll = true;
+
+ isSmallRegion = false;
+
+ updateAndReadWithCoalescingSameKey();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void updateAndReadWithCoalescingSameKey() throws Exception {
+ Ignite ignite = startGrid(0);
+
+ ignite.cluster().active(true);
+
+ IgniteCache<Integer, TestPojo> cache = grid(0).cache("TEST_CACHE");
+
+ AtomicInteger t1Count = new AtomicInteger(5);
+
+ AtomicInteger t2Count = new AtomicInteger(5);
+
+ Thread t1 = new Thread(new Runnable() {
+ @Override public void run() {
+ try {
+ while (t1Count.get() > 0) {
+ for (int i = 0; i < 200000; i++) {
+ TestPojo next = new TestPojo("ORIGIN" + i, i, new java.sql.Date(new java.util.Date().getTime()));
+
+ cache.put(1, next);
+
+ TestPojo ret = cache.get(1);
+
+ assertEquals(ret, next);
+ }
+
+ t1Count.decrementAndGet();
+ }
+ } catch (CacheException e) {
+ //ignore
+ }
+ }
+ });
+
+ Thread t2 = new Thread(new Runnable() {
+ @Override public void run() {
+ try {
+ while (t2Count.get() > 0) {
+ for (int i = 200000; i < 400000; i++) {
+ TestPojo next = new TestPojo("ORIGIN" + i, i, new java.sql.Date(new java.util.Date().getTime()));
+
+ cache.put(2, next);
+
+ TestPojo ret = cache.get(2);
+
+ assertEquals(ret, next);
+ }
+
+ t2Count.decrementAndGet();
+ }
+ } catch (CacheException e) {
+ //ignore
+ }
+ }
+ });
+
+ TestErrorHandler handler = new TestErrorHandler();
+
+ t1.setUncaughtExceptionHandler(handler);
+
+ t2.setUncaughtExceptionHandler(handler);
+
+ t1.start();
+
+ t2.start();
+
+ t1.join();
+
+ t2.join();
+
+ assertFalse(testFailed.get());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void readWithCoalescing() throws Exception {
+ Ignite ignite = startGrid(0);
+
+ ignite.cluster().active(true);
+
+ IgniteCache<Integer, TestPojo> cache = grid(0).cache("TEST_CACHE");
+
+ AtomicInteger t1Count = new AtomicInteger(5);
+
+ AtomicInteger t2Count = new AtomicInteger(5);
+
+ Thread t1 = new Thread(new Runnable() {
+ @Override public void run() {
+ try {
+ while (t1Count.get() > 0) {
+ for (int i = 0; i < 200000; i++) {
+ TestPojo next = new TestPojo("ORIGIN" + i, i, new java.sql.Date(new java.util.Date().getTime()));
+
+ cache.put(i, next);
+
+ TestPojo ret = cache.get(i);
+
+ assertEquals(ret, next);
+ }
+
+ t1Count.decrementAndGet();
+ }
+ } catch (CacheException e) {
+ //ignore
+ }
+ }
+ });
+
+ Thread t2 = new Thread(new Runnable() {
+ @Override public void run() {
+ try {
+ while (t2Count.get() > 0) {
+ for (int i = 200000; i < 400000; i++) {
+ TestPojo next = new TestPojo("ORIGIN" + i, i, new java.sql.Date(new java.util.Date().getTime()));
+
+ cache.put(i, next);
+
+ TestPojo ret = cache.get(i);
+
+ assertEquals(ret, next);
+ }
+
+ t2Count.decrementAndGet();
+ }
+ } catch (CacheException e) {
+ //ignore
+ }
+ }
+ });
+
+ TestErrorHandler handler = new TestErrorHandler();
+
+ t1.setUncaughtExceptionHandler(handler);
+
+ t2.setUncaughtExceptionHandler(handler);
+
+ t1.start();
+
+ t2.start();
+
+ t1.join();
+
+ t2.join();
+
+ assertFalse(testFailed.get());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void writeAllWithCoalescing() throws Exception {
+ Ignite ignite = startGrid(0);
+
+ ignite.cluster().active(true);
+
+ IgniteCache<Integer, TestPojo> cache = grid(0).cache("TEST_CACHE");
+
+ AtomicInteger t1Count = new AtomicInteger(10);
+
+ AtomicInteger t2Count = new AtomicInteger(10);
+
+ Thread t1 = new Thread(new Runnable() {
+ @Override public void run() {
+ try {
+ while (t1Count.get() > 0) {
+ for (int i = 0; i < 5000; i++)
+ cache.put(i, new TestPojo("ORIGIN" + i, i, new java.sql.Date(new java.util.Date().getTime())));
+
+ t1Count.decrementAndGet();
+ }
+ } catch (CacheException e) {
+ //ignore
+ }
+ }
+ });
+
+ Thread t2 = new Thread(new Runnable() {
+ @Override public void run() {
+ try {
+ while (t2Count.get() > 0) {
+ for (int i = 0; i < 5000; i++)
+ cache.put(i, new TestPojo("UPDATE" + i, i, new java.sql.Date(new java.util.Date().getTime())));
+
+ try {
+ U.sleep(500);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ e.printStackTrace();
+ }
+
+ t2Count.decrementAndGet();
+ }
+ } catch (CacheException e) {
+ //ignore
+ }
+ }
+ });
+
+ t1.start();
+
+ t2.start();
+
+ //t1 should be completed before 10 seconds.
+ U.sleep(10_000);
+
+ assertEquals(0, t1Count.get());
+
+ t1.join();
+
+ t2.join();
+
+ assertEquals(0, t2Count.get());
+
+ //now wait for updates will be done on store size and check that the data set is the same
+ if (isHangOnWriteAll)
+ //max time -> 10000 updates that will be send by 1000 batches -> (10000 / 1000) * 10 = 100 seconds.
+ U.sleep(100_000);
+ else
+ U.sleep(10_000);
+
+ checkCacheStore(cache);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private class TestErrorHandler implements Thread.UncaughtExceptionHandler {
+ /** {@inheritDoc} */
+ @Override public void uncaughtException(Thread t, Throwable e) {
+ testFailed.set(true);
+ }
+ }
+}
+
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/TestJdbcPojoDataSource.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/TestJdbcPojoDataSource.java
new file mode 100644
index 0000000..627b4a3
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/TestJdbcPojoDataSource.java
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.jdbc.model;
+
+import java.io.PrintWriter;
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.CallableStatement;
+import java.sql.Clob;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.NClob;
+import java.sql.PreparedStatement;
+import java.sql.SQLClientInfoException;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
+import java.sql.Savepoint;
+import java.sql.Statement;
+import java.sql.Struct;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executor;
+import java.util.logging.Logger;
+import javax.sql.DataSource;
+
+/**
+ * Test JDBC POJO DataSource.
+ */
+public class TestJdbcPojoDataSource implements DataSource {
+ /** */
+ private final ThreadLocal<ConnectionHolder> holder = new ThreadLocal<ConnectionHolder>() {
+ @Override
+ protected ConnectionHolder initialValue() {
+ return new ConnectionHolder();
+ }
+ };
+
+ /** */
+ private volatile boolean perThreadMode = true;
+
+ /** */
+ private String url;
+
+ /** */
+ private String username;
+
+ /** */
+ private String password;
+
+ /** */
+ private long idleCheckTimeout = 30_000;
+
+ /** */
+ public void setDriverClassName(String driverClassName) {
+ try {
+ Class.forName(driverClassName);
+ }
+ catch (ClassNotFoundException e) {
+ throw new IllegalStateException("JDBC driver class not found: " + driverClassName, e);
+ }
+ }
+
+ /** */
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ /** */
+ public void setUsername(String username) {
+ this.username = username;
+ }
+
+ /** */
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ /** */
+ public void setIdleCheckTimeout(long idleCheckTimeout) {
+ this.idleCheckTimeout = idleCheckTimeout;
+ }
+
+ /** */
+ void switchPerThreadMode(boolean perThreadMode) {
+ this.perThreadMode = perThreadMode;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Connection getConnection() throws SQLException {
+ if (perThreadMode) {
+ ConnectionHolder holder = this.holder.get();
+
+ holder.initIfNeeded();
+
+ return holder;
+ }
+ else
+ return createConnection();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Connection getConnection(String username, String password) throws SQLException {
+ throw new SQLFeatureNotSupportedException();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void setLoginTimeout(int seconds) throws SQLException {
+ throw new SQLFeatureNotSupportedException();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int getLoginTimeout() throws SQLException {
+ throw new SQLFeatureNotSupportedException();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public <T> T unwrap(Class<T> iface) throws SQLException {
+ throw new SQLFeatureNotSupportedException();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean isWrapperFor(Class<?> iface) throws SQLException {
+ throw new SQLFeatureNotSupportedException();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public PrintWriter getLogWriter() throws SQLException {
+ throw new SQLFeatureNotSupportedException();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void setLogWriter(PrintWriter out) throws SQLException {
+ throw new SQLFeatureNotSupportedException();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Logger getParentLogger() throws SQLFeatureNotSupportedException {
+ throw new SQLFeatureNotSupportedException();
+ }
+
+ /** {@inheritDoc} */
+ private Connection createConnection() throws SQLException {
+ return DriverManager.getConnection(url, username, password);
+ }
+
+ /** {@inheritDoc} */
+ private class ConnectionHolder implements Connection {
+ /** */
+ private Connection conn;
+
+ /** */
+ private long lastAccessTs = System.currentTimeMillis();
+
+ /** */
+ void initIfNeeded() throws SQLException {
+ if (conn == null || conn.isClosed())
+ conn = createConnection();
+ else if (System.currentTimeMillis() - lastAccessTs > idleCheckTimeout && !conn.isValid(5_000)) {
+ conn.close();
+
+ conn = createConnection();
+ }
+
+ lastAccessTs = System.currentTimeMillis();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close() throws SQLException {
+ if (!perThreadMode)
+ conn.close();
+ }
+
+ // --- Delegation. ---
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean isClosed() throws SQLException {
+ return conn.isClosed();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Statement createStatement() throws SQLException {
+ return conn.createStatement();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public PreparedStatement prepareStatement(String sql) throws SQLException {
+ return conn.prepareStatement(sql);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CallableStatement prepareCall(String sql) throws SQLException {
+ return conn.prepareCall(sql);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String nativeSQL(String sql) throws SQLException {
+ return conn.nativeSQL(sql);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void setAutoCommit(boolean autoCommit) throws SQLException {
+ conn.setAutoCommit(autoCommit);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean getAutoCommit() throws SQLException {
+ return conn.getAutoCommit();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void commit() throws SQLException {
+ conn.commit();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void rollback() throws SQLException {
+ conn.rollback();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public DatabaseMetaData getMetaData() throws SQLException {
+ return conn.getMetaData();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void setReadOnly(boolean readOnly) throws SQLException {
+ conn.setReadOnly(readOnly);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean isReadOnly() throws SQLException {
+ return conn.isReadOnly();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void setCatalog(String catalog) throws SQLException {
+ conn.setCatalog(catalog);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String getCatalog() throws SQLException {
+ return conn.getCatalog();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void setTransactionIsolation(int level) throws SQLException {
+ conn.setTransactionIsolation(level);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int getTransactionIsolation() throws SQLException {
+ return conn.getTransactionIsolation();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public SQLWarning getWarnings() throws SQLException {
+ return conn.getWarnings();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void clearWarnings() throws SQLException {
+ conn.clearWarnings();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
+ return conn.createStatement(resultSetType, resultSetConcurrency);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
+ return conn.prepareStatement(sql, resultSetType, resultSetConcurrency);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
+ return conn.prepareCall(sql, resultSetType, resultSetConcurrency);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Map<String, Class<?>> getTypeMap() throws SQLException {
+ return conn.getTypeMap();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
+ conn.setTypeMap(map);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void setHoldability(int holdability) throws SQLException {
+ conn.setHoldability(holdability);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int getHoldability() throws SQLException {
+ return conn.getHoldability();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Savepoint setSavepoint() throws SQLException {
+ return conn.setSavepoint();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Savepoint setSavepoint(String name) throws SQLException {
+ return conn.setSavepoint(name);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void rollback(Savepoint savepoint) throws SQLException {
+ conn.rollback(savepoint);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void releaseSavepoint(Savepoint savepoint) throws SQLException {
+ conn.releaseSavepoint(savepoint);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+ return conn.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+ return conn.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+ return conn.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
+ return conn.prepareStatement(sql, autoGeneratedKeys);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
+ return conn.prepareStatement(sql, columnIndexes);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
+ return conn.prepareStatement(sql, columnNames);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Clob createClob() throws SQLException {
+ return conn.createClob();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Blob createBlob() throws SQLException {
+ return conn.createBlob();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public NClob createNClob() throws SQLException {
+ return conn.createNClob();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public SQLXML createSQLXML() throws SQLException {
+ return conn.createSQLXML();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean isValid(int timeout) throws SQLException {
+ return conn.isValid(timeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void setClientInfo(String name, String value) throws SQLClientInfoException {
+ conn.setClientInfo(name, value);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void setClientInfo(Properties properties) throws SQLClientInfoException {
+ conn.setClientInfo(properties);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String getClientInfo(String name) throws SQLException {
+ return conn.getClientInfo(name);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Properties getClientInfo() throws SQLException {
+ return conn.getClientInfo();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
+ return conn.createArrayOf(typeName, elements);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
+ return conn.createStruct(typeName, attributes);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void setSchema(String schema) throws SQLException {
+ conn.setSchema(schema);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String getSchema() throws SQLException {
+ return conn.getSchema();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void abort(Executor executor) throws SQLException {
+ conn.abort(executor);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
+ conn.setNetworkTimeout(executor, milliseconds);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int getNetworkTimeout() throws SQLException {
+ return conn.getNetworkTimeout();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public <T> T unwrap(Class<T> iface) throws SQLException {
+ return conn.unwrap(iface);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean isWrapperFor(Class<?> iface) throws SQLException {
+ return conn.isWrapperFor(iface);
+ }
+ }
+}
+
+
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/TestJdbcPojoDataSourceFactory.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/TestJdbcPojoDataSourceFactory.java
new file mode 100644
index 0000000..340751f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/TestJdbcPojoDataSourceFactory.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.jdbc.model;
+
+import java.util.Objects;
+import javax.cache.configuration.Factory;
+import javax.sql.DataSource;
+
+/**
+ * Test JDBC POJO DataSource factory.
+ */
+public class TestJdbcPojoDataSourceFactory implements Factory<DataSource> {
+ /** */
+ private String URL;
+
+ /** */
+ private String userName;
+
+ /** */
+ private String password;
+
+ /** {@inheritDoc} */
+ @Override public DataSource create() {
+ TestJdbcPojoDataSource ds = new TestJdbcPojoDataSource();
+
+ ds.setUrl("jdbc:h2:mem:TestDatabase;DB_CLOSE_DELAY=-1");
+
+ ds.setUsername("sa");
+
+ ds.setPassword("");
+
+ return ds;
+ }
+
+ /** */
+ public String getURL() {
+ return URL;
+ }
+
+ /** */
+ public void setURL(String URL) {
+ this.URL = URL;
+ }
+
+ /** */
+ public String getUserName() {
+ return userName;
+ }
+
+ /** */
+ public void setUserName(String userName) {
+ this.userName = userName;
+ }
+
+ /** */
+ public String getPassword() {
+ return password;
+ }
+
+ /** */
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ TestJdbcPojoDataSourceFactory factory = (TestJdbcPojoDataSourceFactory)o;
+ return Objects.equals(URL, factory.URL) &&
+ Objects.equals(userName, factory.userName) &&
+ Objects.equals(password, factory.password);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+
+ return Objects.hash(URL, userName, password);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "TestJdbcPojoDataSourceFactory{" +
+ "URL='" + URL + '\'' +
+ ", userName='" + userName + '\'' +
+ ", password='" + password + '\'' +
+ '}';
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/TestJdbcPojoStoreFactoryWithHangWriteAll.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/TestJdbcPojoStoreFactoryWithHangWriteAll.java
new file mode 100644
index 0000000..ebe93f3
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/TestJdbcPojoStoreFactoryWithHangWriteAll.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.jdbc.model;
+
+import java.sql.PreparedStatement;
+import java.util.Collection;
+import javax.cache.Cache;
+import javax.cache.CacheException;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import javax.sql.DataSource;
+import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStore;
+import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreFactory;
+import org.apache.ignite.cache.store.jdbc.JdbcTypeField;
+import org.apache.ignite.lang.IgniteBiInClosure;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Test JDBC POJO Store Factory With Hang WriteAll Method..
+ */
+public class TestJdbcPojoStoreFactoryWithHangWriteAll<K, V> extends CacheJdbcPojoStoreFactory<K, V> {
+ /** */
+ private static long count = 0;
+
+ /** {@inheritDoc} */
+ @Override public CacheJdbcPojoStore<K, V> create() {
+ CacheJdbcPojoStore<K, V> store = new TestJdbcPojoStoreWithHangWriteAll<>();
+
+ store.setBatchSize(getBatchSize());
+ store.setDialect(getDialect());
+ store.setMaximumPoolSize(getMaximumPoolSize());
+ store.setMaximumWriteAttempts(getMaximumWriteAttempts());
+ store.setParallelLoadCacheMinimumThreshold(getParallelLoadCacheMinimumThreshold());
+ store.setTypes(getTypes());
+ store.setHasher(getHasher());
+ store.setTransformer(getTransformer());
+ store.setSqlEscapeAll(isSqlEscapeAll());
+ store.setDataSource(getDataSourceFactory().create());
+
+ return store;
+ }
+
+ /** */
+ public static class TestJdbcPojoStoreWithHangWriteAll<K,V> extends CacheJdbcPojoStore<K,V> {
+ /** {@inheritDoc} */
+ @Override protected void fillParameter(PreparedStatement stmt, int idx, JdbcTypeField field, @Nullable Object fieldVal) throws CacheException {
+ try {
+ super.fillParameter(stmt, idx, field, fieldVal);
+ }
+ catch (Exception e) {
+ log.error("Failed to fill parameter [idx=" + idx + ", field=" + field + ", val=" + fieldVal + ']', e);
+
+ throw e;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void loadCache(IgniteBiInClosure<K, V> clo, @Nullable Object... args) throws CacheLoaderException {
+ DataSource ds = getDataSource();
+
+ try {
+ if (ds instanceof TestJdbcPojoDataSource)
+ ((TestJdbcPojoDataSource)ds).switchPerThreadMode(false);
+
+ super.loadCache(clo, args);
+ }
+ finally {
+ if (ds instanceof TestJdbcPojoDataSource)
+ ((TestJdbcPojoDataSource)ds).switchPerThreadMode(true);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void delete(Object key) throws CacheWriterException {
+ try {
+ super.delete(key);
+ }
+ catch (Exception e) {
+ log.error("Failed to delete entry from cache store: " + key, e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void deleteAll(Collection<?> keys) throws CacheWriterException {
+ try {
+ super.deleteAll(keys);
+ }
+ catch (Exception e) {
+ log.error("Failed to delete entries from cache store: " + keys, e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(Cache.Entry<? extends K, ? extends V> entry) throws CacheWriterException {
+ try {
+ super.write(entry);
+ }
+ catch (Exception e) {
+ log.error("Failed to write entry to cache store: " + entry, e);
+ }
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeAll(Collection<Cache.Entry<? extends K, ? extends V>> entries) throws CacheWriterException {
+ try {
+ super.writeAll(entries);
+
+ Thread.sleep(10000);
+
+ count += entries.size();
+
+ log.info("Count of load data: " + count);
+ }
+ catch (Exception e) {
+ log.error("Failed to write entries to cache store: " + entries, e);
+ }
+ }
+ }
+}
+
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/TestPojo.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/TestPojo.java
new file mode 100644
index 0000000..0423629
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/model/TestPojo.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.jdbc.model;
+
+import java.io.Serializable;
+import java.util.Objects;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import java.sql.Date;
+
+/**
+ * Test JDBC POJO object.
+ */
+public class TestPojo implements Serializable {
+ /** */
+ @QuerySqlField
+ private String value1;
+
+ /** */
+ @QuerySqlField
+ private Integer value2;
+
+ /** */
+ @QuerySqlField
+ private Date value3;
+
+ /**
+ * Default constructor.
+ */
+ public TestPojo() {
+ // No-op.
+ }
+
+ /** */
+ public TestPojo(String value1, int value2, Date value3) {
+ this.value1 = value1;
+
+ this.value2 = value2;
+
+ this.value3 = value3;
+ }
+
+ /** */
+ public String getValue1() {
+ return value1;
+ }
+
+ /** */
+ public void setValue1(String value1) {
+ this.value1 = value1;
+ }
+
+ /** */
+ public Integer getValue2() {
+ return value2;
+ }
+
+ /** */
+ public void setValue2(Integer value2) {
+ this.value2 = value2;
+ }
+
+ /** */
+ public Date getValue3() {
+ return value3;
+ }
+
+ /** */
+ public void setValue3(Date value3) {
+ this.value3 = value3;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ TestPojo pogo = (TestPojo)o;
+
+ return Objects.equals(value1, pogo.value1) &&
+ Objects.equals(value2, pogo.value2) &&
+ Objects.equals(value3, pogo.value3);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+
+ return Objects.hash(value1, value2, value3);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "TestPojo{" +
+ "value1='" + value1 + '\'' +
+ ", value2=" + value2 +
+ ", value3=" + value3 +
+ '}';
+ }
+}
+
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreSelfTest.java
index b242ac9..46adbde 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreSelfTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.store;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -168,12 +169,15 @@ public class GridCacheWriteBehindStoreSelfTest extends GridCacheWriteBehindStore
assertEquals("v1", store.load(1));
assertEquals("v2", store.load(2));
assertNull(store.load(3));
+ assertEquals(store.loadAll(Arrays.asList(3, 4, 5)).size(), 0);
store.delete(1);
assertNull(store.load(1));
+ assertEquals(store.loadAll(Arrays.asList(1)).size(), 0);
assertEquals("v2", store.load(2));
assertNull(store.load(3));
+ assertEquals(store.loadAll(Arrays.asList(3)).size(), 0);
}
finally {
shutdownStore();
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 07d26aa..dd773e1 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -35,6 +35,7 @@ import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreBinaryMarshallerStor
import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreBinaryMarshallerWithSqlEscapeSelfTest;
import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreMultitreadedSelfTest;
import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreTest;
+import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoWriteBehindStoreWithCoalescingTest;
import org.apache.ignite.cache.store.jdbc.GridCacheJdbcBlobStoreMultithreadedSelfTest;
import org.apache.ignite.cache.store.jdbc.GridCacheJdbcBlobStoreSelfTest;
import org.apache.ignite.cache.store.jdbc.JdbcTypesDefaultTransformerTest;
@@ -243,6 +244,7 @@ public class IgniteCacheTestSuite {
GridTestUtils.addTestIfNeeded(suite, CacheJdbcPojoStoreBinaryMarshallerWithSqlEscapeSelfTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, CacheJdbcPojoStoreBinaryMarshallerStoreKeepBinaryWithSqlEscapeSelfTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, CacheJdbcPojoStoreMultitreadedSelfTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, CacheJdbcPojoWriteBehindStoreWithCoalescingTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, GridCacheBalancingStoreSelfTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, GridCacheAffinityApiSelfTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, GridCacheStoreValueBytesSelfTest.class, ignoredTests);