You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/02/03 03:25:58 UTC
incubator-ignite git commit: # IGNITE-32: Fixes after review.
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-32 707aaf533 -> 8e5c0a8af
# IGNITE-32: Fixes after review.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8e5c0a8a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8e5c0a8a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8e5c0a8a
Branch: refs/heads/ignite-32
Commit: 8e5c0a8af10e39dd4f70d9fcecb8c60d0b3d7822
Parents: 707aaf5
Author: AKuznetsov <ak...@gridgain.com>
Authored: Tue Feb 3 09:26:03 2015 +0700
Committer: AKuznetsov <ak...@gridgain.com>
Committed: Tue Feb 3 09:26:03 2015 +0700
----------------------------------------------------------------------
.../ignite/cache/store/jdbc/JdbcCacheStore.java | 259 ++++++++++++-------
1 file changed, 161 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e5c0a8a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java
index 4774d16..33461b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java
@@ -169,8 +169,6 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> implements L
protected JdbcDialect resolveDialect() throws CacheException {
Connection conn = null;
- // TODO check conn.getMetaData().getURL() will work ???
-
String dbProductName = null;
try {
@@ -612,6 +610,44 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> implements L
}
}
+ /**
+ * @param insStmt Insert statement.
+ * @param updStmt Update statement.
+ * @param em Entry mapping.
+ * @param entry Cache entry.
+ */
+ private void writeUpsert(PreparedStatement insStmt, PreparedStatement updStmt,
+ EntryMapping em, Cache.Entry<? extends K, ? extends V> entry) throws SQLException {
+ for (int attempt = 0; attempt < MAX_ATTEMPT_WRITE_COUNT; attempt++) {
+ int i = fillValueParameters(updStmt, 1, em, entry.getValue());
+
+ fillKeyParameters(updStmt, i, em, entry.getKey());
+
+ if (updStmt.executeUpdate() == 0) {
+ i = fillKeyParameters(insStmt, em, entry.getKey());
+
+ fillValueParameters(insStmt, i, em, entry.getValue());
+
+ try {
+ insStmt.executeUpdate();
+ }
+ catch (SQLException e) {
+ // The error with code 23505 is thrown when trying to insert a row that
+ // would violate a unique index or primary key.
+ // TODO check with all RDBMS
+ if (e.getErrorCode() == 23505)
+ continue;
+
+ throw e;
+ }
+ }
+
+ return;
+ }
+
+ throw new CacheWriterException("Failed write entry to database: " + entry);
+ }
+
/** {@inheritDoc} */
@Override public void write(Cache.Entry<? extends K, ? extends V> entry) throws CacheWriterException {
assert entry != null;
@@ -625,64 +661,49 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> implements L
Connection conn = null;
- PreparedStatement stmt = null;
-
try {
conn = connection();
if (dialect.hasMerge()) {
- stmt = conn.prepareStatement(em.mergeQry);
+ PreparedStatement stmt = null;
+
+ try {
+ stmt = conn.prepareStatement(em.mergeQry);
- int i = fillKeyParameters(stmt, em, key);
+ int i = fillKeyParameters(stmt, em, key);
- fillValueParameters(stmt, i, em, entry.getValue());
+ fillValueParameters(stmt, i, em, entry.getValue());
- stmt.executeUpdate();
+ stmt.executeUpdate();
+ }
+ finally {
+ U.closeQuiet(stmt);
+ }
}
else {
- V val = entry.getValue();
-
- for (int attempt = 0; attempt < MAX_ATTEMPT_WRITE_COUNT; attempt++) {
- stmt = conn.prepareStatement(em.updQry);
+ PreparedStatement insStmt = null;
- int i = fillValueParameters(stmt, 1, em, val);
+ PreparedStatement updStmt = null;
- fillKeyParameters(stmt, i, em, key);
-
- if (stmt.executeUpdate() == 0) {
- U.closeQuiet(stmt);
-
- stmt = conn.prepareStatement(em.insQry);
-
- i = fillKeyParameters(stmt, em, key);
-
- fillValueParameters(stmt, i, em, val);
+ try {
+ insStmt = conn.prepareStatement(em.insQry);
- try {
- stmt.executeUpdate();
- }
- catch (SQLException e) {
- // The error with code 23505 is thrown when trying to insert a row that
- // would violate a unique index or primary key.
- // TODO check with all RDBMS
- if (e.getErrorCode() == 23505)
- continue;
-
- throw e;
- }
- }
+ updStmt = conn.prepareStatement(em.updQry);
- return;
+ writeUpsert(insStmt, updStmt, em, entry);
}
+ finally {
+ U.closeQuiet(insStmt);
- throw new CacheWriterException("Failed write entry to database: " + entry);
+ U.closeQuiet(updStmt);
+ }
}
}
catch (SQLException e) {
throw new CacheWriterException("Failed to write entry to database: " + entry, e);
}
finally {
- end(conn, stmt);
+ closeConnection(conn);
}
}
@@ -691,68 +712,101 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> implements L
throws CacheWriterException {
assert entries != null;
- if (dialect.hasMerge()) {
- Connection conn = null;
+ Connection conn = null;
- PreparedStatement mergeStmt = null;
+ try {
+ conn = connection();
- try {
- conn = connection();
+ Object currKeyTypeId = null;
- Object currKeyTypeId = null;
+ if (dialect.hasMerge()) {
+ PreparedStatement mergeStmt = null;
- int cnt = 0;
+ try {
+ int fromIdx = 0, prepared = 0;
- for (Cache.Entry<? extends K, ? extends V> entry : entries) {
- K key = entry.getKey();
+ for (Cache.Entry<? extends K, ? extends V> entry : entries) {
+ K key = entry.getKey();
- Object keyTypeId = keyTypeId(key);
+ Object keyTypeId = keyTypeId(key);
- EntryMapping em = entryMapping(keyTypeId, key);
+ EntryMapping em = entryMapping(keyTypeId, key);
- if (mergeStmt == null) {
- mergeStmt = conn.prepareStatement(em.mergeQry);
+ if (currKeyTypeId == null || !currKeyTypeId.equals(keyTypeId)) {
+ if (mergeStmt != null) {
+ executeBatch(mergeStmt, "writeAll", fromIdx, prepared, entries);
- currKeyTypeId = keyTypeId;
- }
+ U.closeQuiet(mergeStmt);
+ }
- if (!currKeyTypeId.equals(keyTypeId)) {
- executeBatch(mergeStmt, "writeAll", cnt);
+ mergeStmt = conn.prepareStatement(em.mergeQry);
- currKeyTypeId = keyTypeId;
+ currKeyTypeId = keyTypeId;
- cnt = 0;
- }
+ prepared = 0;
+ }
- int i = fillKeyParameters(mergeStmt, em, key);
+ int i = fillKeyParameters(mergeStmt, em, key);
- fillValueParameters(mergeStmt, i, em, entry.getValue());
+ fillValueParameters(mergeStmt, i, em, entry.getValue());
- mergeStmt.addBatch();
+ mergeStmt.addBatch();
- if (++cnt % batchSz == 0) {
- executeBatch(mergeStmt, "writeAll", cnt);
+ if (++prepared % batchSz == 0) {
+ executeBatch(mergeStmt, "writeAll", fromIdx, prepared, entries);
- cnt = 0;
+ prepared = 0;
+ }
}
- }
- if (mergeStmt != null && cnt % batchSz != 0)
- executeBatch(mergeStmt, "writeAll", cnt);
- }
- catch (SQLException e) {
- throw new CacheWriterException("Failed to write entries in database", e);
+ if (mergeStmt != null && prepared % batchSz != 0)
+ executeBatch(mergeStmt, "writeAll", fromIdx, prepared, entries);
+ }
+ finally {
+ U.closeQuiet(mergeStmt);
+ }
}
- finally {
- U.closeQuiet(mergeStmt);
+ else {
+ PreparedStatement insStmt = null;
+
+ PreparedStatement updStmt = null;
+
+ try {
+ for (Cache.Entry<? extends K, ? extends V> entry : entries) {
+ K key = entry.getKey();
+
+ Object keyTypeId = keyTypeId(key);
+
+ EntryMapping em = entryMapping(keyTypeId, key);
- closeConnection(conn);
+ if (currKeyTypeId == null || !currKeyTypeId.equals(keyTypeId)) {
+ U.closeQuiet(insStmt);
+
+ insStmt = conn.prepareStatement(em.insQry);
+
+ U.closeQuiet(updStmt);
+
+ updStmt = conn.prepareStatement(em.updQry);
+
+ currKeyTypeId = keyTypeId;
+ }
+
+ writeUpsert(insStmt, updStmt, em, entry);
+ }
+ }
+ finally {
+ U.closeQuiet(insStmt);
+
+ U.closeQuiet(updStmt);
+ }
}
}
- else
- for (Cache.Entry<? extends K, ? extends V> e : entries)
- write(e); // TODO rework to optimal usage. Method write will get all params each time (conn, stmt).
- // split into 2 methods writeMerge + writeUpsert.
+ catch (SQLException e) {
+ throw new CacheWriterException("Failed to write entries in database", e);
+ }
+ finally {
+ closeConnection(conn);
+ }
}
/** {@inheritDoc} */
@@ -788,24 +842,29 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> implements L
/**
* @param stmt Statement.
- * @param stmtType Statement type for error message.
- * @param batchSz Expected batch size.
+ * @param stmtType Statement description for error message.
+ * @param fromIdx Objects in batch start from index.
+ * @param prepared Expected objects in batch.
+ * @param objects All objects.
*/
- private void executeBatch(Statement stmt, String stmtType, int batchSz) throws SQLException {
+ private void executeBatch(Statement stmt, String stmtType, int fromIdx, int prepared, Collection<?> objects)
+ throws SQLException {
int[] rowCounts = stmt.executeBatch();
int numOfRowCnt = rowCounts.length;
- if (numOfRowCnt != batchSz)
- log.warning("JDBC driver did not return the expected number of row counts," +
- " actual row count: " + numOfRowCnt + " expected: " + batchSz);
+ if (numOfRowCnt != prepared)
+ log.warning("JDBC driver did not return the expected number of updated row counts," +
+ " actual row count: " + numOfRowCnt + " expected: " + prepared);
+
+ Object[] arr = null;
+
+ for (int i = 0; i < numOfRowCnt; i++)
+ if (rowCounts[i] != 1) {
+ if (arr == null)
+ arr = objects.toArray();
- for (int rowCount : rowCounts)
- if (rowCount != 1) {
- // TODO stmtType used 2 times?
- // TODO print warning for all keys.
- // TODO while failed - convert collection to array and print warnins to each failed key.
- log.warning("Batch " + stmtType + " returned unexpected row count from " + stmtType + " statement");
+ log.warning("Batch " + stmtType + " returned unexpected updated row count for: " + arr[fromIdx + i]);
}
}
@@ -822,7 +881,7 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> implements L
PreparedStatement delStmt = null;
- int cnt = 0;
+ int fromIdx = 0, prepared = 0;
for (Object key : keys) {
Object keyTypeId = keyTypeId(key);
@@ -836,9 +895,11 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> implements L
}
if (!currKeyTypeId.equals(keyTypeId)) {
- executeBatch(delStmt, "deleteAll", cnt);
+ executeBatch(delStmt, "deleteAll", fromIdx, prepared, keys);
- cnt = 0;
+ fromIdx += prepared;
+
+ prepared = 0;
currKeyTypeId = keyTypeId;
}
@@ -847,15 +908,17 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> implements L
delStmt.addBatch();
- if (++cnt % batchSz == 0) {
- executeBatch(delStmt, "deleteAll", cnt);
+ if (++prepared % batchSz == 0) {
+ executeBatch(delStmt, "deleteAll", fromIdx, prepared, keys);
+
+ fromIdx += prepared;
- cnt = 0;
+ prepared = 0;
}
}
- if (delStmt != null && cnt % batchSz != 0)
- executeBatch(delStmt, "deleteAll", cnt);
+ if (delStmt != null && prepared % batchSz != 0)
+ executeBatch(delStmt, "deleteAll", fromIdx, prepared, keys);
}
catch (SQLException e) {
throw new CacheWriterException("Failed to remove values from database", e);