You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2016/08/22 08:09:04 UTC
[1/3] ignite git commit: IGNITE-3708 Fixed multithreaded loading
entries for MySql.
Repository: ignite
Updated Branches:
refs/heads/master 2d4360707 -> 05c5939ae
IGNITE-3708 Fixed multithreaded loading entries for MySql.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d399db92
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d399db92
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d399db92
Branch: refs/heads/master
Commit: d399db92ab4e147a3933a42dd5635b225665ac63
Parents: 974467a
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Mon Aug 22 15:00:06 2016 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Mon Aug 22 15:00:06 2016 +0700
----------------------------------------------------------------------
.../store/jdbc/CacheAbstractJdbcStore.java | 43 +++++++++++---------
.../store/jdbc/dialect/BasicJdbcDialect.java | 7 +++-
.../cache/store/jdbc/dialect/JdbcDialect.java | 11 ++++-
.../cache/store/jdbc/dialect/MySQLDialect.java | 18 +++++++-
.../jdbc/CacheJdbcPojoStoreFactorySelfTest.java | 11 +++--
5 files changed, 65 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d399db92/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
index c16f2c6..aad05e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
@@ -495,10 +495,11 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
* @param clo Closure that will be applied to loaded values.
* @param lowerBound Lower bound for range.
* @param upperBound Upper bound for range.
+ * @param fetchSize Number of rows to fetch from DB.
* @return Callable for pool submit.
*/
private Callable<Void> loadCacheRange(final EntryMapping em, final IgniteBiInClosure<K, V> clo,
- @Nullable final Object[] lowerBound, @Nullable final Object[] upperBound) {
+ @Nullable final Object[] lowerBound, @Nullable final Object[] upperBound, final int fetchSize) {
return new Callable<Void>() {
@Override public Void call() throws Exception {
Connection conn = null;
@@ -512,6 +513,8 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
? em.loadCacheQry
: em.loadCacheRangeQuery(lowerBound != null, upperBound != null));
+ stmt.setFetchSize(fetchSize);
+
int idx = 1;
if (lowerBound != null)
@@ -555,7 +558,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
* @return Callable for pool submit.
*/
private Callable<Void> loadCacheFull(EntryMapping m, IgniteBiInClosure<K, V> clo) {
- return loadCacheRange(m, clo, null, null);
+ return loadCacheRange(m, clo, null, null, dialect.getFetchSize());
}
/**
@@ -811,10 +814,6 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
for (EntryMapping em : entryMappings) {
if (parallelLoadCacheMinThreshold > 0) {
- if (log.isDebugEnabled())
- log.debug("Multithread loading entries from db [cache=" + U.maskName(cacheName) +
- ", keyType=" + em.keyType() + " ]");
-
Connection conn = null;
try {
@@ -827,6 +826,10 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
ResultSet rs = stmt.executeQuery();
if (rs.next()) {
+ if (log.isDebugEnabled())
+ log.debug("Multithread loading entries from db [cache=" + U.maskName(cacheName) +
+ ", keyType=" + em.keyType() + " ]");
+
int keyCnt = em.keyCols.size();
Object[] upperBound = new Object[keyCnt];
@@ -834,7 +837,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
for (int i = 0; i < keyCnt; i++)
upperBound[i] = rs.getObject(i + 1);
- futs.add(pool.submit(loadCacheRange(em, clo, null, upperBound)));
+ futs.add(pool.submit(loadCacheRange(em, clo, null, upperBound, 0)));
while (rs.next()) {
Object[] lowerBound = upperBound;
@@ -844,28 +847,28 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
for (int i = 0; i < keyCnt; i++)
upperBound[i] = rs.getObject(i + 1);
- futs.add(pool.submit(loadCacheRange(em, clo, lowerBound, upperBound)));
+ futs.add(pool.submit(loadCacheRange(em, clo, lowerBound, upperBound, 0)));
}
- futs.add(pool.submit(loadCacheRange(em, clo, upperBound, null)));
+ futs.add(pool.submit(loadCacheRange(em, clo, upperBound, null, 0)));
+
+ continue;
}
- else
- futs.add(pool.submit(loadCacheFull(em, clo)));
}
- catch (SQLException ignored) {
- futs.add(pool.submit(loadCacheFull(em, clo)));
+ catch (SQLException e) {
+ log.warning("Failed to load entries from db in multithreaded mode [cache=" + U.maskName(cacheName) +
+ ", keyType=" + em.keyType() + " ]", e);
}
finally {
U.closeQuiet(conn);
}
}
- else {
- if (log.isDebugEnabled())
- log.debug("Single thread loading entries from db [cache=" + U.maskName(cacheName) +
- ", keyType=" + em.keyType() + " ]");
- futs.add(pool.submit(loadCacheFull(em, clo)));
- }
+ if (log.isDebugEnabled())
+ log.debug("Single thread loading entries from db [cache=" + U.maskName(cacheName) +
+ ", keyType=" + em.keyType() + " ]");
+
+ futs.add(pool.submit(loadCacheFull(em, clo)));
}
}
@@ -1926,6 +1929,8 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
stmt = conn.prepareStatement(qry);
+ stmt.setFetchSize(dialect.getFetchSize());
+
ResultSet rs = stmt.executeQuery();
ResultSetMetaData meta = rs.getMetaData();
http://git-wip-us.apache.org/repos/asf/ignite/blob/d399db92/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java
index abb59d3..cd9c986 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java
@@ -274,4 +274,9 @@ public class BasicJdbcDialect implements JdbcDialect {
public void setMaxParameterCount(int maxParamsCnt) {
this.maxParamsCnt = maxParamsCnt;
}
-}
\ No newline at end of file
+
+ /** {@inheritDoc} */
+ @Override public int getFetchSize() {
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d399db92/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/JdbcDialect.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/JdbcDialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/JdbcDialect.java
index 38e981f..9daa00b 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/JdbcDialect.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/JdbcDialect.java
@@ -115,4 +115,13 @@ public interface JdbcDialect extends Serializable {
* @return Max query parameters count.
*/
public int getMaxParameterCount();
-}
\ No newline at end of file
+
+ /**
+ * Gives the JDBC driver a hint how many rows should be fetched from the database when more rows are needed.
+ * If the value specified is zero, then the hint is ignored.
+ * The default value is zero.
+ *
+ * @return The fetch size for result sets.
+ */
+ public int getFetchSize();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d399db92/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java
index f7512a7..84e6d05 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java
@@ -29,6 +29,15 @@ public class MySQLDialect extends BasicJdbcDialect {
private static final long serialVersionUID = 0L;
/** {@inheritDoc} */
+ @Override public String loadCacheSelectRangeQuery(String fullTblName, Collection<String> keyCols) {
+ String cols = mkString(keyCols, ",");
+
+ return String.format("SELECT %s " +
+ "FROM (SELECT %s, @rownum := @rownum + 1 AS rn FROM %s, (SELECT @rownum := 0) r ORDER BY %s) as r " +
+ "WHERE mod(rn, ?) = 0", cols, cols, fullTblName, cols);
+ }
+
+ /** {@inheritDoc} */
@Override public boolean hasMerge() {
return true;
}
@@ -48,4 +57,11 @@ public class MySQLDialect extends BasicJdbcDialect {
return String.format("INSERT INTO %s (%s) VALUES (%s) ON DUPLICATE KEY UPDATE %s", fullTblName,
mkString(cols, ", "), repeat("?", cols.size(), "", ",", ""), updPart);
}
-}
\ No newline at end of file
+
+ /** {@inheritDoc} */
+ @Override public int getFetchSize() {
+ // Workaround for known issue with MySQL large result set.
+ // See: http://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-implementation-notes.html
+ return Integer.MIN_VALUE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d399db92/modules/spring/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactorySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/spring/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactorySelfTest.java b/modules/spring/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactorySelfTest.java
index 2f36017..dfa1452 100644
--- a/modules/spring/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactorySelfTest.java
+++ b/modules/spring/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactorySelfTest.java
@@ -129,9 +129,9 @@ public class CacheJdbcPojoStoreFactorySelfTest extends GridCommonAbstractTest {
}
/**
- *
+ * Dummy JDBC dialect that does nothing.
*/
- public static class DummyDialect implements JdbcDialect, Serializable {
+ public static class DummyDialect implements JdbcDialect {
/** {@inheritDoc} */
@Override public String loadCacheSelectRangeQuery(String fullTblName, Collection<String> keyCols) {
return null;
@@ -185,5 +185,10 @@ public class CacheJdbcPojoStoreFactorySelfTest extends GridCommonAbstractTest {
@Override public int getMaxParameterCount() {
return 0;
}
+
+ /** {@inheritDoc} */
+ @Override public int getFetchSize() {
+ return 0;
+ }
}
-}
\ No newline at end of file
+}
[2/3] ignite git commit: Merge branches 'ignite-1.6.6' and
'ignite-1.7.2'.
Posted by ak...@apache.org.
Merge branches 'ignite-1.6.6' and 'ignite-1.7.2'.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8d5b6ba3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8d5b6ba3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8d5b6ba3
Branch: refs/heads/master
Commit: 8d5b6ba35d6613e21af6c5043eeae089b159debe
Parents: fa374fb d399db9
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Mon Aug 22 15:03:05 2016 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Mon Aug 22 15:03:05 2016 +0700
----------------------------------------------------------------------
.../store/jdbc/CacheAbstractJdbcStore.java | 43 +++++++++++---------
.../store/jdbc/dialect/BasicJdbcDialect.java | 7 +++-
.../cache/store/jdbc/dialect/JdbcDialect.java | 11 ++++-
.../cache/store/jdbc/dialect/MySQLDialect.java | 18 +++++++-
.../jdbc/CacheJdbcPojoStoreFactorySelfTest.java | 11 +++--
5 files changed, 65 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/8d5b6ba3/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
----------------------------------------------------------------------
[3/3] ignite git commit: Merge branch 'ignite-1.7.2'.
Posted by ak...@apache.org.
Merge branch 'ignite-1.7.2'.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/05c5939a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/05c5939a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/05c5939a
Branch: refs/heads/master
Commit: 05c5939aeeb4a9eabd4a49a9a58c957c1bd73967
Parents: 2d43607 8d5b6ba
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Mon Aug 22 15:09:43 2016 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Mon Aug 22 15:09:43 2016 +0700
----------------------------------------------------------------------
.../store/jdbc/CacheAbstractJdbcStore.java | 43 +++++++++++---------
.../store/jdbc/dialect/BasicJdbcDialect.java | 7 +++-
.../cache/store/jdbc/dialect/JdbcDialect.java | 11 ++++-
.../cache/store/jdbc/dialect/MySQLDialect.java | 18 +++++++-
.../jdbc/CacheJdbcPojoStoreFactorySelfTest.java | 11 +++--
5 files changed, 65 insertions(+), 25 deletions(-)
----------------------------------------------------------------------