You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2020/03/27 14:48:01 UTC
[ignite] branch master updated: IGNITE-12826:
JdbcDialect#setFetchSize added. (#7577)
This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 50547b9 IGNITE-12826: JdbcDialect#setFetchSize added. (#7577)
50547b9 is described below
commit 50547b9b59f1e1a0da1f117877cdb17e8aa155d7
Author: Nikolay <ni...@apache.org>
AuthorDate: Fri Mar 27 17:47:41 2020 +0300
IGNITE-12826: JdbcDialect#setFetchSize added. (#7577)
---
modules/core/pom.xml | 2 +-
.../cache/store/jdbc/CacheAbstractJdbcStore.java | 8 +++--
.../cache/store/jdbc/dialect/BasicJdbcDialect.java | 14 +++++++-
.../cache/store/jdbc/dialect/MySQLDialect.java | 14 ++++----
.../jdbc/CacheJdbcPojoStoreAbstractSelfTest.java | 42 ++++++++++++++++++++--
modules/hibernate-4.2/pom.xml | 2 +-
modules/hibernate-5.1/pom.xml | 2 +-
modules/hibernate-5.3/pom.xml | 2 +-
modules/spring/pom.xml | 7 ++++
parent/pom.xml | 1 +
10 files changed, 77 insertions(+), 17 deletions(-)
diff --git a/modules/core/pom.xml b/modules/core/pom.xml
index 568f7c9..8359345 100644
--- a/modules/core/pom.xml
+++ b/modules/core/pom.xml
@@ -91,7 +91,7 @@
<dependency>
<groupId>commons-dbcp</groupId>
<artifactId>commons-dbcp</artifactId>
- <version>1.4</version>
+ <version>${commons.dbcp.version}</version>
<scope>test</scope>
</dependency>
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
index b1ec38d..ac99dbf 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
@@ -681,6 +681,8 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
throws CacheLoaderException {
ExecutorService pool = null;
+ int fetchSz = dialect.getFetchSize();
+
String cacheName = session().cacheName();
try {
@@ -774,7 +776,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
for (int i = 0; i < keyCnt; i++)
upperBound[i] = rs.getObject(i + 1);
- futs.add(pool.submit(loadCacheRange(em, clo, null, upperBound, 0)));
+ futs.add(pool.submit(loadCacheRange(em, clo, null, upperBound, fetchSz)));
while (rs.next()) {
Object[] lowerBound = upperBound;
@@ -784,10 +786,10 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
for (int i = 0; i < keyCnt; i++)
upperBound[i] = rs.getObject(i + 1);
- futs.add(pool.submit(loadCacheRange(em, clo, lowerBound, upperBound, 0)));
+ futs.add(pool.submit(loadCacheRange(em, clo, lowerBound, upperBound, fetchSz)));
}
- futs.add(pool.submit(loadCacheRange(em, clo, upperBound, null, 0)));
+ futs.add(pool.submit(loadCacheRange(em, clo, upperBound, null, fetchSz)));
continue;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java
index 139f3fc..59361c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java
@@ -35,6 +35,9 @@ public class BasicJdbcDialect implements JdbcDialect {
/** Max query parameters count. */
protected int maxParamsCnt = DFLT_MAX_PARAMS_CNT;
+ /** Fetch size. */
+ protected int fetchSize;
+
/**
* Concatenates elements using provided separator.
*
@@ -288,6 +291,15 @@ public class BasicJdbcDialect implements JdbcDialect {
/** {@inheritDoc} */
@Override public int getFetchSize() {
- return 0;
+ return fetchSize;
+ }
+
+ /**
+ * Sets fetch size.
+ *
+ * @param fetchSize Fetch size.
+ */
+ public void setFetchSize(int fetchSize) {
+ this.fetchSize = fetchSize;
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java
index 1a5730b..31617d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java
@@ -28,6 +28,13 @@ public class MySQLDialect extends BasicJdbcDialect {
/** */
private static final long serialVersionUID = 0L;
+ /** */
+ public MySQLDialect() {
+ // Workaround for known issue with MySQL large result set.
+ // See: http://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-implementation-notes.html
+ fetchSize = Integer.MIN_VALUE;
+ }
+
/** {@inheritDoc} */
@Override public String escape(String ident) {
return '`' + ident + '`';
@@ -60,11 +67,4 @@ public class MySQLDialect extends BasicJdbcDialect {
return String.format("INSERT INTO %s (%s) VALUES (%s) ON DUPLICATE KEY UPDATE %s", fullTblName,
mkString(cols, ", "), repeat("?", cols.size(), "", ",", ""), updPart);
}
-
- /** {@inheritDoc} */
- @Override public int getFetchSize() {
- // Workaround for known issue with MySQL large result set.
- // See: http://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-implementation-notes.html
- return Integer.MIN_VALUE;
- }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreAbstractSelfTest.java
index 8be69f8..a10670e 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreAbstractSelfTest.java
@@ -27,6 +27,8 @@ import java.sql.Statement;
import java.sql.Types;
import java.util.Random;
import javax.cache.integration.CacheLoaderException;
+import org.apache.commons.dbcp.DelegatingConnection;
+import org.apache.commons.dbcp.DelegatingPreparedStatement;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.store.jdbc.dialect.H2Dialect;
import org.apache.ignite.cache.store.jdbc.model.Gender;
@@ -59,6 +61,9 @@ public abstract class CacheJdbcPojoStoreAbstractSelfTest extends GridCommonAbstr
/** Person count. */
private static final int PERSON_CNT = 100000;
+ /** Fetch size. */
+ public static final int FETCH_SZ = 42;
+
/** Test cache name. */
private static final String CACHE_NAME = "test-cache";
@@ -74,6 +79,9 @@ public abstract class CacheJdbcPojoStoreAbstractSelfTest extends GridCommonAbstr
/** Flag indicating that classes for values available on class path or not. */
private static boolean noValClasses;
+ /** Flag indicating that fetch size should be checked. */
+ private static boolean checkFetchSize;
+
/** Batch size to load in parallel. */
private static int parallelLoadThreshold;
@@ -89,7 +97,20 @@ public abstract class CacheJdbcPojoStoreAbstractSelfTest extends GridCommonAbstr
* @throws SQLException if failed to connect.
*/
protected Connection getConnection() throws SQLException {
- return DriverManager.getConnection(DFLT_CONN_URL, "sa", "");
+ return new DelegatingConnection(DriverManager.getConnection(DFLT_CONN_URL, "sa", "")) {
+ /** {@inheritDoc} */
+ @Override public PreparedStatement prepareStatement(String sql) throws SQLException {
+ return new DelegatingPreparedStatement(this, super.prepareStatement(sql)) {
+ /** {@inheritDoc} */
+ @Override public ResultSet executeQuery() throws SQLException {
+ if (checkFetchSize)
+ assertEquals(FETCH_SZ, getFetchSize());
+
+ return super.executeQuery();
+ }
+ };
+ }
+ };
}
/** {@inheritDoc} */
@@ -223,7 +244,12 @@ public abstract class CacheJdbcPojoStoreAbstractSelfTest extends GridCommonAbstr
cc.setStoreKeepBinary(storeKeepBinary());
CacheJdbcPojoStoreFactory<Object, Object> storeFactory = new CacheJdbcPojoStoreFactory<>();
- storeFactory.setDialect(new H2Dialect());
+
+ H2Dialect dialect = new H2Dialect();
+
+ dialect.setFetchSize(FETCH_SZ);
+
+ storeFactory.setDialect(dialect);
storeFactory.setTypes(storeTypes());
storeFactory.setDataSourceFactory(new H2DataSourceFactory()); // H2 DataSource factory.
storeFactory.setSqlEscapeAll(sqlEscapeAll());
@@ -319,8 +345,12 @@ public abstract class CacheJdbcPojoStoreAbstractSelfTest extends GridCommonAbstr
protected void checkCacheLoad() {
IgniteCache<Object, Object> c1 = grid().cache(CACHE_NAME);
+ checkFetchSize = true;
+
c1.loadCache(null);
+ checkFetchSize = false;
+
assertEquals(ORGANIZATION_CNT + PERSON_CNT, c1.size());
}
@@ -330,8 +360,12 @@ public abstract class CacheJdbcPojoStoreAbstractSelfTest extends GridCommonAbstr
protected void checkCacheLoadWithSql() {
IgniteCache<Object, Object> c1 = grid().cache(CACHE_NAME);
+ checkFetchSize = true;
+
c1.loadCache(null, "org.apache.ignite.cache.store.jdbc.model.PersonKey", "select id, org_id, name, birthday, gender from Person");
+ checkFetchSize = false;
+
assertEquals(PERSON_CNT, c1.size());
}
@@ -587,6 +621,8 @@ public abstract class CacheJdbcPojoStoreAbstractSelfTest extends GridCommonAbstr
IgniteCache<Object, Object> c1 = grid().cache(CACHE_NAME);
try {
+ checkFetchSize = true;
+
c1.loadCache(null, "PersonKeyWrong", "SELECT * FROM Person");
}
catch (CacheLoaderException e) {
@@ -595,6 +631,8 @@ public abstract class CacheJdbcPojoStoreAbstractSelfTest extends GridCommonAbstr
assertTrue("Unexpected exception: " + msg,
("Provided key type is not found in store or cache configuration " +
"[cache=" + CACHE_NAME + ", key=PersonKeyWrong]").equals(msg));
+ } finally {
+ checkFetchSize = false;
}
}
}
diff --git a/modules/hibernate-4.2/pom.xml b/modules/hibernate-4.2/pom.xml
index 9a3bf2f..1b5734f 100644
--- a/modules/hibernate-4.2/pom.xml
+++ b/modules/hibernate-4.2/pom.xml
@@ -70,7 +70,7 @@
<dependency>
<groupId>commons-dbcp</groupId>
<artifactId>commons-dbcp</artifactId>
- <version>1.4</version>
+ <version>${commons.dbcp.version}</version>
<scope>test</scope>
</dependency>
diff --git a/modules/hibernate-5.1/pom.xml b/modules/hibernate-5.1/pom.xml
index f45ea4d..053d4aa 100644
--- a/modules/hibernate-5.1/pom.xml
+++ b/modules/hibernate-5.1/pom.xml
@@ -70,7 +70,7 @@
<dependency>
<groupId>commons-dbcp</groupId>
<artifactId>commons-dbcp</artifactId>
- <version>1.4</version>
+ <version>${commons.dbcp.version}</version>
<scope>test</scope>
</dependency>
diff --git a/modules/hibernate-5.3/pom.xml b/modules/hibernate-5.3/pom.xml
index 1935433..11c5b8b 100644
--- a/modules/hibernate-5.3/pom.xml
+++ b/modules/hibernate-5.3/pom.xml
@@ -70,7 +70,7 @@
<dependency>
<groupId>commons-dbcp</groupId>
<artifactId>commons-dbcp</artifactId>
- <version>1.4</version>
+ <version>${commons.dbcp.version}</version>
<scope>test</scope>
</dependency>
diff --git a/modules/spring/pom.xml b/modules/spring/pom.xml
index 33cbfec..7c1e98f 100644
--- a/modules/spring/pom.xml
+++ b/modules/spring/pom.xml
@@ -125,6 +125,13 @@
</dependency>
<dependency>
+ <groupId>commons-dbcp</groupId>
+ <artifactId>commons-dbcp</artifactId>
+ <version>${commons.dbcp.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>com.thoughtworks.xstream</groupId>
<artifactId>xstream</artifactId>
<version>1.4.8</version>
diff --git a/parent/pom.xml b/parent/pom.xml
index d08a7ea..32fb624 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -61,6 +61,7 @@
<commons.collections.version>3.2.2</commons.collections.version>
<commons.lang.version>2.6</commons.lang.version>
<commons.io.version>2.6</commons.io.version>
+ <commons.dbcp.version>1.4</commons.dbcp.version>
<cron4j.version>2.2.5</cron4j.version>
<curator.version>4.2.0</curator.version>
<easymock.version>3.4</easymock.version>