You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2020/03/27 14:48:01 UTC

[ignite] branch master updated: IGNITE-12826: JdbcDialect#setFetchSize added. (#7577)

This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 50547b9  IGNITE-12826: JdbcDialect#setFetchSize added. (#7577)
50547b9 is described below

commit 50547b9b59f1e1a0da1f117877cdb17e8aa155d7
Author: Nikolay <ni...@apache.org>
AuthorDate: Fri Mar 27 17:47:41 2020 +0300

    IGNITE-12826: JdbcDialect#setFetchSize added. (#7577)
---
 modules/core/pom.xml                               |  2 +-
 .../cache/store/jdbc/CacheAbstractJdbcStore.java   |  8 +++--
 .../cache/store/jdbc/dialect/BasicJdbcDialect.java | 14 +++++++-
 .../cache/store/jdbc/dialect/MySQLDialect.java     | 14 ++++----
 .../jdbc/CacheJdbcPojoStoreAbstractSelfTest.java   | 42 ++++++++++++++++++++--
 modules/hibernate-4.2/pom.xml                      |  2 +-
 modules/hibernate-5.1/pom.xml                      |  2 +-
 modules/hibernate-5.3/pom.xml                      |  2 +-
 modules/spring/pom.xml                             |  7 ++++
 parent/pom.xml                                     |  1 +
 10 files changed, 77 insertions(+), 17 deletions(-)

diff --git a/modules/core/pom.xml b/modules/core/pom.xml
index 568f7c9..8359345 100644
--- a/modules/core/pom.xml
+++ b/modules/core/pom.xml
@@ -91,7 +91,7 @@
         <dependency>
             <groupId>commons-dbcp</groupId>
             <artifactId>commons-dbcp</artifactId>
-            <version>1.4</version>
+            <version>${commons.dbcp.version}</version>
             <scope>test</scope>
         </dependency>
 
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
index b1ec38d..ac99dbf 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
@@ -681,6 +681,8 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
         throws CacheLoaderException {
         ExecutorService pool = null;
 
+        int fetchSz = dialect.getFetchSize();
+
         String cacheName = session().cacheName();
 
         try {
@@ -774,7 +776,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
                                 for (int i = 0; i < keyCnt; i++)
                                     upperBound[i] = rs.getObject(i + 1);
 
-                                futs.add(pool.submit(loadCacheRange(em, clo, null, upperBound, 0)));
+                                futs.add(pool.submit(loadCacheRange(em, clo, null, upperBound, fetchSz)));
 
                                 while (rs.next()) {
                                     Object[] lowerBound = upperBound;
@@ -784,10 +786,10 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
                                     for (int i = 0; i < keyCnt; i++)
                                         upperBound[i] = rs.getObject(i + 1);
 
-                                    futs.add(pool.submit(loadCacheRange(em, clo, lowerBound, upperBound, 0)));
+                                    futs.add(pool.submit(loadCacheRange(em, clo, lowerBound, upperBound, fetchSz)));
                                 }
 
-                                futs.add(pool.submit(loadCacheRange(em, clo, upperBound, null, 0)));
+                                futs.add(pool.submit(loadCacheRange(em, clo, upperBound, null, fetchSz)));
 
                                 continue;
                             }
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java
index 139f3fc..59361c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java
@@ -35,6 +35,9 @@ public class BasicJdbcDialect implements JdbcDialect {
     /** Max query parameters count. */
     protected int maxParamsCnt = DFLT_MAX_PARAMS_CNT;
 
+    /** Fetch size. */
+    protected int fetchSize;
+
     /**
      * Concatenates elements using provided separator.
      *
@@ -288,6 +291,15 @@ public class BasicJdbcDialect implements JdbcDialect {
 
     /** {@inheritDoc} */
     @Override public int getFetchSize() {
-        return 0;
+        return fetchSize;
+    }
+
+    /**
+     * Sets fetch size.
+     *
+     * @param fetchSize Fetch size.
+     */
+    public void setFetchSize(int fetchSize) {
+        this.fetchSize = fetchSize;
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java
index 1a5730b..31617d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java
@@ -28,6 +28,13 @@ public class MySQLDialect extends BasicJdbcDialect {
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** */
+    public MySQLDialect() {
+        // Workaround for known issue with MySQL large result set.
+        // See: http://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-implementation-notes.html
+        fetchSize = Integer.MIN_VALUE;
+    }
+
     /** {@inheritDoc} */
     @Override public String escape(String ident) {
         return '`' + ident + '`';
@@ -60,11 +67,4 @@ public class MySQLDialect extends BasicJdbcDialect {
         return String.format("INSERT INTO %s (%s) VALUES (%s) ON DUPLICATE KEY UPDATE %s", fullTblName,
             mkString(cols, ", "), repeat("?", cols.size(), "", ",", ""), updPart);
     }
-
-    /** {@inheritDoc} */
-    @Override public int getFetchSize() {
-        // Workaround for known issue with MySQL large result set.
-        // See: http://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-implementation-notes.html
-        return Integer.MIN_VALUE;
-    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreAbstractSelfTest.java
index 8be69f8..a10670e 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreAbstractSelfTest.java
@@ -27,6 +27,8 @@ import java.sql.Statement;
 import java.sql.Types;
 import java.util.Random;
 import javax.cache.integration.CacheLoaderException;
+import org.apache.commons.dbcp.DelegatingConnection;
+import org.apache.commons.dbcp.DelegatingPreparedStatement;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.store.jdbc.dialect.H2Dialect;
 import org.apache.ignite.cache.store.jdbc.model.Gender;
@@ -59,6 +61,9 @@ public abstract class CacheJdbcPojoStoreAbstractSelfTest extends GridCommonAbstr
     /** Person count. */
     private static final int PERSON_CNT = 100000;
 
+    /** Fetch size. */
+    public static final int FETCH_SZ = 42;
+
     /** Test cache name. */
     private static final String CACHE_NAME = "test-cache";
 
@@ -74,6 +79,9 @@ public abstract class CacheJdbcPojoStoreAbstractSelfTest extends GridCommonAbstr
     /** Flag indicating that classes for values available on class path or not. */
     private static boolean noValClasses;
 
+    /** Flag indicating that fetch size should be checked. */
+    private static boolean checkFetchSize;
+
     /** Batch size to load in parallel. */
     private static int parallelLoadThreshold;
 
@@ -89,7 +97,20 @@ public abstract class CacheJdbcPojoStoreAbstractSelfTest extends GridCommonAbstr
      * @throws SQLException if failed to connect.
      */
     protected Connection getConnection() throws SQLException {
-        return DriverManager.getConnection(DFLT_CONN_URL, "sa", "");
+        return new DelegatingConnection(DriverManager.getConnection(DFLT_CONN_URL, "sa", "")) {
+            /** {@inheritDoc} */
+            @Override public PreparedStatement prepareStatement(String sql) throws SQLException {
+                return new DelegatingPreparedStatement(this, super.prepareStatement(sql)) {
+                    /** {@inheritDoc} */
+                    @Override public ResultSet executeQuery() throws SQLException {
+                        if (checkFetchSize)
+                            assertEquals(FETCH_SZ, getFetchSize());
+
+                        return super.executeQuery();
+                    }
+                };
+            }
+        };
     }
 
     /** {@inheritDoc} */
@@ -223,7 +244,12 @@ public abstract class CacheJdbcPojoStoreAbstractSelfTest extends GridCommonAbstr
         cc.setStoreKeepBinary(storeKeepBinary());
 
         CacheJdbcPojoStoreFactory<Object, Object> storeFactory = new CacheJdbcPojoStoreFactory<>();
-        storeFactory.setDialect(new H2Dialect());
+
+        H2Dialect dialect = new H2Dialect();
+
+        dialect.setFetchSize(FETCH_SZ);
+
+        storeFactory.setDialect(dialect);
         storeFactory.setTypes(storeTypes());
         storeFactory.setDataSourceFactory(new H2DataSourceFactory()); // H2 DataSource factory.
         storeFactory.setSqlEscapeAll(sqlEscapeAll());
@@ -319,8 +345,12 @@ public abstract class CacheJdbcPojoStoreAbstractSelfTest extends GridCommonAbstr
     protected void checkCacheLoad() {
         IgniteCache<Object, Object> c1 = grid().cache(CACHE_NAME);
 
+        checkFetchSize = true;
+
         c1.loadCache(null);
 
+        checkFetchSize = false;
+
         assertEquals(ORGANIZATION_CNT + PERSON_CNT, c1.size());
     }
 
@@ -330,8 +360,12 @@ public abstract class CacheJdbcPojoStoreAbstractSelfTest extends GridCommonAbstr
     protected void checkCacheLoadWithSql() {
         IgniteCache<Object, Object> c1 = grid().cache(CACHE_NAME);
 
+        checkFetchSize = true;
+
         c1.loadCache(null, "org.apache.ignite.cache.store.jdbc.model.PersonKey", "select id, org_id, name, birthday, gender from Person");
 
+        checkFetchSize = false;
+
         assertEquals(PERSON_CNT, c1.size());
     }
 
@@ -587,6 +621,8 @@ public abstract class CacheJdbcPojoStoreAbstractSelfTest extends GridCommonAbstr
         IgniteCache<Object, Object> c1 = grid().cache(CACHE_NAME);
 
         try {
+            checkFetchSize = true;
+
             c1.loadCache(null, "PersonKeyWrong", "SELECT * FROM Person");
         }
         catch (CacheLoaderException e) {
@@ -595,6 +631,8 @@ public abstract class CacheJdbcPojoStoreAbstractSelfTest extends GridCommonAbstr
             assertTrue("Unexpected exception: " + msg,
                 ("Provided key type is not found in store or cache configuration " +
                     "[cache=" + CACHE_NAME + ", key=PersonKeyWrong]").equals(msg));
+        } finally {
+            checkFetchSize = false;
         }
     }
 }
diff --git a/modules/hibernate-4.2/pom.xml b/modules/hibernate-4.2/pom.xml
index 9a3bf2f..1b5734f 100644
--- a/modules/hibernate-4.2/pom.xml
+++ b/modules/hibernate-4.2/pom.xml
@@ -70,7 +70,7 @@
         <dependency>
             <groupId>commons-dbcp</groupId>
             <artifactId>commons-dbcp</artifactId>
-            <version>1.4</version>
+            <version>${commons.dbcp.version}</version>
             <scope>test</scope>
         </dependency>
 
diff --git a/modules/hibernate-5.1/pom.xml b/modules/hibernate-5.1/pom.xml
index f45ea4d..053d4aa 100644
--- a/modules/hibernate-5.1/pom.xml
+++ b/modules/hibernate-5.1/pom.xml
@@ -70,7 +70,7 @@
         <dependency>
             <groupId>commons-dbcp</groupId>
             <artifactId>commons-dbcp</artifactId>
-            <version>1.4</version>
+            <version>${commons.dbcp.version}</version>
             <scope>test</scope>
         </dependency>
 
diff --git a/modules/hibernate-5.3/pom.xml b/modules/hibernate-5.3/pom.xml
index 1935433..11c5b8b 100644
--- a/modules/hibernate-5.3/pom.xml
+++ b/modules/hibernate-5.3/pom.xml
@@ -70,7 +70,7 @@
         <dependency>
             <groupId>commons-dbcp</groupId>
             <artifactId>commons-dbcp</artifactId>
-            <version>1.4</version>
+            <version>${commons.dbcp.version}</version>
             <scope>test</scope>
         </dependency>
 
diff --git a/modules/spring/pom.xml b/modules/spring/pom.xml
index 33cbfec..7c1e98f 100644
--- a/modules/spring/pom.xml
+++ b/modules/spring/pom.xml
@@ -125,6 +125,13 @@
         </dependency>
 
         <dependency>
+            <groupId>commons-dbcp</groupId>
+            <artifactId>commons-dbcp</artifactId>
+            <version>${commons.dbcp.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
             <groupId>com.thoughtworks.xstream</groupId>
             <artifactId>xstream</artifactId>
             <version>1.4.8</version>
diff --git a/parent/pom.xml b/parent/pom.xml
index d08a7ea..32fb624 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -61,6 +61,7 @@
         <commons.collections.version>3.2.2</commons.collections.version>
         <commons.lang.version>2.6</commons.lang.version>
         <commons.io.version>2.6</commons.io.version>
+        <commons.dbcp.version>1.4</commons.dbcp.version>
         <cron4j.version>2.2.5</cron4j.version>
         <curator.version>4.2.0</curator.version>
         <easymock.version>3.4</easymock.version>