You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2020/03/26 14:49:39 UTC

[ignite] branch IGNITE-12826 created (now a655243)

This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a change to branch IGNITE-12826
in repository https://gitbox.apache.org/repos/asf/ignite.git.


      at a655243  IGNITE-12826: JdbcDialect#setFetchSize added.

This branch includes the following new commits:

     new a655243  IGNITE-12826: JdbcDialect#setFetchSize added.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[ignite] 01/01: IGNITE-12826: JdbcDialect#setFetchSize added.

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch IGNITE-12826
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit a655243d1bedd5813a4a161df34e365b5e8c0463
Author: Nikolay Izhikov <ni...@apache.org>
AuthorDate: Thu Mar 26 17:49:05 2020 +0300

    IGNITE-12826: JdbcDialect#setFetchSize added.
---
 .../cache/store/jdbc/CacheAbstractJdbcStore.java   |  8 +++--
 .../cache/store/jdbc/dialect/BasicJdbcDialect.java | 14 +++++++-
 .../cache/store/jdbc/dialect/MySQLDialect.java     | 14 ++++----
 .../jdbc/CacheJdbcPojoStoreAbstractSelfTest.java   | 42 ++++++++++++++++++++--
 4 files changed, 65 insertions(+), 13 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
index b1ec38d..ac99dbf 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
@@ -681,6 +681,8 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
         throws CacheLoaderException {
         ExecutorService pool = null;
 
+        int fetchSz = dialect.getFetchSize();
+
         String cacheName = session().cacheName();
 
         try {
@@ -774,7 +776,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
                                 for (int i = 0; i < keyCnt; i++)
                                     upperBound[i] = rs.getObject(i + 1);
 
-                                futs.add(pool.submit(loadCacheRange(em, clo, null, upperBound, 0)));
+                                futs.add(pool.submit(loadCacheRange(em, clo, null, upperBound, fetchSz)));
 
                                 while (rs.next()) {
                                     Object[] lowerBound = upperBound;
@@ -784,10 +786,10 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
                                     for (int i = 0; i < keyCnt; i++)
                                         upperBound[i] = rs.getObject(i + 1);
 
-                                    futs.add(pool.submit(loadCacheRange(em, clo, lowerBound, upperBound, 0)));
+                                    futs.add(pool.submit(loadCacheRange(em, clo, lowerBound, upperBound, fetchSz)));
                                 }
 
-                                futs.add(pool.submit(loadCacheRange(em, clo, upperBound, null, 0)));
+                                futs.add(pool.submit(loadCacheRange(em, clo, upperBound, null, fetchSz)));
 
                                 continue;
                             }
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java
index 139f3fc..59361c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java
@@ -35,6 +35,9 @@ public class BasicJdbcDialect implements JdbcDialect {
     /** Max query parameters count. */
     protected int maxParamsCnt = DFLT_MAX_PARAMS_CNT;
 
+    /** Fetch size. */
+    protected int fetchSize;
+
     /**
      * Concatenates elements using provided separator.
      *
@@ -288,6 +291,15 @@ public class BasicJdbcDialect implements JdbcDialect {
 
     /** {@inheritDoc} */
     @Override public int getFetchSize() {
-        return 0;
+        return fetchSize;
+    }
+
+    /**
+     * Sets fetch size.
+     *
+     * @param fetchSize Fetch size.
+     */
+    public void setFetchSize(int fetchSize) {
+        this.fetchSize = fetchSize;
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java
index 1a5730b..31617d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java
@@ -28,6 +28,13 @@ public class MySQLDialect extends BasicJdbcDialect {
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** */
+    public MySQLDialect() {
+        // Workaround for known issue with MySQL large result set.
+        // See: http://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-implementation-notes.html
+        fetchSize = Integer.MIN_VALUE;
+    }
+
     /** {@inheritDoc} */
     @Override public String escape(String ident) {
         return '`' + ident + '`';
@@ -60,11 +67,4 @@ public class MySQLDialect extends BasicJdbcDialect {
         return String.format("INSERT INTO %s (%s) VALUES (%s) ON DUPLICATE KEY UPDATE %s", fullTblName,
             mkString(cols, ", "), repeat("?", cols.size(), "", ",", ""), updPart);
     }
-
-    /** {@inheritDoc} */
-    @Override public int getFetchSize() {
-        // Workaround for known issue with MySQL large result set.
-        // See: http://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-implementation-notes.html
-        return Integer.MIN_VALUE;
-    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreAbstractSelfTest.java
index 8be69f8..a10670e 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreAbstractSelfTest.java
@@ -27,6 +27,8 @@ import java.sql.Statement;
 import java.sql.Types;
 import java.util.Random;
 import javax.cache.integration.CacheLoaderException;
+import org.apache.commons.dbcp.DelegatingConnection;
+import org.apache.commons.dbcp.DelegatingPreparedStatement;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.store.jdbc.dialect.H2Dialect;
 import org.apache.ignite.cache.store.jdbc.model.Gender;
@@ -59,6 +61,9 @@ public abstract class CacheJdbcPojoStoreAbstractSelfTest extends GridCommonAbstr
     /** Person count. */
     private static final int PERSON_CNT = 100000;
 
+    /** Fetch size. */
+    public static final int FETCH_SZ = 42;
+
     /** Test cache name. */
     private static final String CACHE_NAME = "test-cache";
 
@@ -74,6 +79,9 @@ public abstract class CacheJdbcPojoStoreAbstractSelfTest extends GridCommonAbstr
     /** Flag indicating that classes for values available on class path or not. */
     private static boolean noValClasses;
 
+    /** Flag indicating that fetch size should be checked. */
+    private static boolean checkFetchSize;
+
     /** Batch size to load in parallel. */
     private static int parallelLoadThreshold;
 
@@ -89,7 +97,20 @@ public abstract class CacheJdbcPojoStoreAbstractSelfTest extends GridCommonAbstr
      * @throws SQLException if failed to connect.
      */
     protected Connection getConnection() throws SQLException {
-        return DriverManager.getConnection(DFLT_CONN_URL, "sa", "");
+        return new DelegatingConnection(DriverManager.getConnection(DFLT_CONN_URL, "sa", "")) {
+            /** {@inheritDoc} */
+            @Override public PreparedStatement prepareStatement(String sql) throws SQLException {
+                return new DelegatingPreparedStatement(this, super.prepareStatement(sql)) {
+                    /** {@inheritDoc} */
+                    @Override public ResultSet executeQuery() throws SQLException {
+                        if (checkFetchSize)
+                            assertEquals(FETCH_SZ, getFetchSize());
+
+                        return super.executeQuery();
+                    }
+                };
+            }
+        };
     }
 
     /** {@inheritDoc} */
@@ -223,7 +244,12 @@ public abstract class CacheJdbcPojoStoreAbstractSelfTest extends GridCommonAbstr
         cc.setStoreKeepBinary(storeKeepBinary());
 
         CacheJdbcPojoStoreFactory<Object, Object> storeFactory = new CacheJdbcPojoStoreFactory<>();
-        storeFactory.setDialect(new H2Dialect());
+
+        H2Dialect dialect = new H2Dialect();
+
+        dialect.setFetchSize(FETCH_SZ);
+
+        storeFactory.setDialect(dialect);
         storeFactory.setTypes(storeTypes());
         storeFactory.setDataSourceFactory(new H2DataSourceFactory()); // H2 DataSource factory.
         storeFactory.setSqlEscapeAll(sqlEscapeAll());
@@ -319,8 +345,12 @@ public abstract class CacheJdbcPojoStoreAbstractSelfTest extends GridCommonAbstr
     protected void checkCacheLoad() {
         IgniteCache<Object, Object> c1 = grid().cache(CACHE_NAME);
 
+        checkFetchSize = true;
+
         c1.loadCache(null);
 
+        checkFetchSize = false;
+
         assertEquals(ORGANIZATION_CNT + PERSON_CNT, c1.size());
     }
 
@@ -330,8 +360,12 @@ public abstract class CacheJdbcPojoStoreAbstractSelfTest extends GridCommonAbstr
     protected void checkCacheLoadWithSql() {
         IgniteCache<Object, Object> c1 = grid().cache(CACHE_NAME);
 
+        checkFetchSize = true;
+
         c1.loadCache(null, "org.apache.ignite.cache.store.jdbc.model.PersonKey", "select id, org_id, name, birthday, gender from Person");
 
+        checkFetchSize = false;
+
         assertEquals(PERSON_CNT, c1.size());
     }
 
@@ -587,6 +621,8 @@ public abstract class CacheJdbcPojoStoreAbstractSelfTest extends GridCommonAbstr
         IgniteCache<Object, Object> c1 = grid().cache(CACHE_NAME);
 
         try {
+            checkFetchSize = true;
+
             c1.loadCache(null, "PersonKeyWrong", "SELECT * FROM Person");
         }
         catch (CacheLoaderException e) {
@@ -595,6 +631,8 @@ public abstract class CacheJdbcPojoStoreAbstractSelfTest extends GridCommonAbstr
             assertTrue("Unexpected exception: " + msg,
                 ("Provided key type is not found in store or cache configuration " +
                     "[cache=" + CACHE_NAME + ", key=PersonKeyWrong]").equals(msg));
+        } finally {
+            checkFetchSize = false;
         }
     }
 }