You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/02/03 03:25:58 UTC

incubator-ignite git commit: # IGNITE-32: Fixes after review.

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-32 707aaf533 -> 8e5c0a8af


# IGNITE-32: Fixes after review.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8e5c0a8a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8e5c0a8a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8e5c0a8a

Branch: refs/heads/ignite-32
Commit: 8e5c0a8af10e39dd4f70d9fcecb8c60d0b3d7822
Parents: 707aaf5
Author: AKuznetsov <ak...@gridgain.com>
Authored: Tue Feb 3 09:26:03 2015 +0700
Committer: AKuznetsov <ak...@gridgain.com>
Committed: Tue Feb 3 09:26:03 2015 +0700

----------------------------------------------------------------------
 .../ignite/cache/store/jdbc/JdbcCacheStore.java | 259 ++++++++++++-------
 1 file changed, 161 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e5c0a8a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java
index 4774d16..33461b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/JdbcCacheStore.java
@@ -169,8 +169,6 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> implements L
     protected JdbcDialect resolveDialect() throws CacheException {
         Connection conn = null;
 
-        // TODO check conn.getMetaData().getURL() will work ???
-
         String dbProductName = null;
 
         try {
@@ -612,6 +610,44 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> implements L
         }
     }
 
+    /**
+     * @param insStmt Insert statement.
+     * @param updStmt Update statement.
+     * @param em Entry mapping.
+     * @param entry Cache entry.
+     */
+    private void writeUpsert(PreparedStatement insStmt, PreparedStatement updStmt,
+        EntryMapping em, Cache.Entry<? extends K, ? extends V> entry) throws SQLException {
+        for (int attempt = 0; attempt < MAX_ATTEMPT_WRITE_COUNT; attempt++) {
+            int i = fillValueParameters(updStmt, 1, em, entry.getValue());
+
+            fillKeyParameters(updStmt, i, em, entry.getKey());
+
+            if (updStmt.executeUpdate() == 0) {
+                i = fillKeyParameters(insStmt, em, entry.getKey());
+
+                fillValueParameters(insStmt, i, em, entry.getValue());
+
+                try {
+                    insStmt.executeUpdate();
+                }
+                catch (SQLException e) {
+                    // The error with code 23505 is thrown when trying to insert a row that
+                    // would violate a unique index or primary key.
+                    // TODO check with all RDBMS
+                    if (e.getErrorCode() == 23505)
+                        continue;
+
+                    throw e;
+                }
+            }
+
+            return;
+        }
+
+        throw new CacheWriterException("Failed write entry to database: " + entry);
+    }
+
     /** {@inheritDoc} */
     @Override public void write(Cache.Entry<? extends K, ? extends V> entry) throws CacheWriterException {
         assert entry != null;
@@ -625,64 +661,49 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> implements L
 
         Connection conn = null;
 
-        PreparedStatement stmt = null;
-
         try {
             conn = connection();
 
             if (dialect.hasMerge()) {
-                stmt = conn.prepareStatement(em.mergeQry);
+                PreparedStatement stmt = null;
+
+                try {
+                    stmt = conn.prepareStatement(em.mergeQry);
 
-                int i = fillKeyParameters(stmt, em, key);
+                    int i = fillKeyParameters(stmt, em, key);
 
-                fillValueParameters(stmt, i, em, entry.getValue());
+                    fillValueParameters(stmt, i, em, entry.getValue());
 
-                stmt.executeUpdate();
+                    stmt.executeUpdate();
+                }
+                finally {
+                    U.closeQuiet(stmt);
+                }
             }
             else {
-                V val = entry.getValue();
-
-                for (int attempt = 0; attempt < MAX_ATTEMPT_WRITE_COUNT; attempt++) {
-                    stmt = conn.prepareStatement(em.updQry);
+                PreparedStatement insStmt = null;
 
-                    int i = fillValueParameters(stmt, 1, em, val);
+                PreparedStatement updStmt = null;
 
-                    fillKeyParameters(stmt, i, em, key);
-
-                    if (stmt.executeUpdate() == 0) {
-                        U.closeQuiet(stmt);
-
-                        stmt = conn.prepareStatement(em.insQry);
-
-                        i = fillKeyParameters(stmt, em, key);
-
-                        fillValueParameters(stmt, i, em, val);
+                try {
+                    insStmt = conn.prepareStatement(em.insQry);
 
-                        try {
-                            stmt.executeUpdate();
-                        }
-                        catch (SQLException e) {
-                            // The error with code 23505 is thrown when trying to insert a row that
-                            // would violate a unique index or primary key.
-                            // TODO check with all RDBMS
-                            if (e.getErrorCode() == 23505)
-                                continue;
-
-                            throw e;
-                        }
-                    }
+                    updStmt = conn.prepareStatement(em.updQry);
 
-                    return;
+                    writeUpsert(insStmt, updStmt, em, entry);
                 }
+                finally {
+                    U.closeQuiet(insStmt);
 
-                throw new CacheWriterException("Failed write entry to database: " + entry);
+                    U.closeQuiet(updStmt);
+                }
             }
         }
         catch (SQLException e) {
             throw new CacheWriterException("Failed to write entry to database: " + entry, e);
         }
         finally {
-            end(conn, stmt);
+            closeConnection(conn);
         }
     }
 
@@ -691,68 +712,101 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> implements L
         throws CacheWriterException {
         assert entries != null;
 
-        if (dialect.hasMerge()) {
-            Connection conn = null;
+        Connection conn = null;
 
-            PreparedStatement mergeStmt = null;
+        try {
+            conn = connection();
 
-            try {
-                conn = connection();
+            Object currKeyTypeId = null;
 
-                Object currKeyTypeId = null;
+            if (dialect.hasMerge()) {
+                PreparedStatement mergeStmt = null;
 
-                int cnt = 0;
+                try {
+                    int fromIdx = 0, prepared = 0;
 
-                for (Cache.Entry<? extends K, ? extends V> entry : entries) {
-                    K key = entry.getKey();
+                    for (Cache.Entry<? extends K, ? extends V> entry : entries) {
+                        K key = entry.getKey();
 
-                    Object keyTypeId = keyTypeId(key);
+                        Object keyTypeId = keyTypeId(key);
 
-                    EntryMapping em = entryMapping(keyTypeId, key);
+                        EntryMapping em = entryMapping(keyTypeId, key);
 
-                    if (mergeStmt == null) {
-                        mergeStmt = conn.prepareStatement(em.mergeQry);
+                        if (currKeyTypeId == null || !currKeyTypeId.equals(keyTypeId)) {
+                            if (mergeStmt != null) {
+                                executeBatch(mergeStmt, "writeAll", fromIdx, prepared, entries);
 
-                        currKeyTypeId = keyTypeId;
-                    }
+                                U.closeQuiet(mergeStmt);
+                            }
 
-                    if (!currKeyTypeId.equals(keyTypeId)) {
-                        executeBatch(mergeStmt, "writeAll", cnt);
+                            mergeStmt = conn.prepareStatement(em.mergeQry);
 
-                        currKeyTypeId = keyTypeId;
+                            currKeyTypeId = keyTypeId;
 
-                        cnt = 0;
-                    }
+                            prepared = 0;
+                        }
 
-                    int i = fillKeyParameters(mergeStmt, em, key);
+                        int i = fillKeyParameters(mergeStmt, em, key);
 
-                    fillValueParameters(mergeStmt, i, em, entry.getValue());
+                        fillValueParameters(mergeStmt, i, em, entry.getValue());
 
-                    mergeStmt.addBatch();
+                        mergeStmt.addBatch();
 
-                    if (++cnt % batchSz == 0) {
-                        executeBatch(mergeStmt, "writeAll", cnt);
+                        if (++prepared % batchSz == 0) {
+                            executeBatch(mergeStmt, "writeAll", fromIdx, prepared, entries);
 
-                        cnt = 0;
+                            prepared = 0;
+                        }
                     }
-                }
 
-                if (mergeStmt != null && cnt % batchSz != 0)
-                    executeBatch(mergeStmt, "writeAll", cnt);
-            }
-            catch (SQLException e) {
-                throw new CacheWriterException("Failed to write entries in database", e);
+                    if (mergeStmt != null && prepared % batchSz != 0)
+                        executeBatch(mergeStmt, "writeAll", fromIdx, prepared, entries);
+                }
+                finally {
+                    U.closeQuiet(mergeStmt);
+                }
             }
-            finally {
-                U.closeQuiet(mergeStmt);
+            else {
+                PreparedStatement insStmt = null;
+
+                PreparedStatement updStmt = null;
+
+                try {
+                    for (Cache.Entry<? extends K, ? extends V> entry : entries) {
+                        K key = entry.getKey();
+
+                        Object keyTypeId = keyTypeId(key);
+
+                        EntryMapping em = entryMapping(keyTypeId, key);
 
-                closeConnection(conn);
+                        if (currKeyTypeId == null || !currKeyTypeId.equals(keyTypeId)) {
+                            U.closeQuiet(insStmt);
+
+                            insStmt = conn.prepareStatement(em.insQry);
+
+                            U.closeQuiet(updStmt);
+
+                            updStmt = conn.prepareStatement(em.updQry);
+
+                            currKeyTypeId = keyTypeId;
+                        }
+
+                        writeUpsert(insStmt, updStmt, em, entry);
+                    }
+                }
+                finally {
+                    U.closeQuiet(insStmt);
+
+                    U.closeQuiet(updStmt);
+                }
             }
         }
-        else
-            for (Cache.Entry<? extends K, ? extends V> e : entries)
-                write(e); // TODO rework to optimal usage. Method write will get all params each time (conn, stmt).
-                          // split into 2 methods writeMerge + writeUpsert.
+        catch (SQLException e) {
+            throw new CacheWriterException("Failed to write entries in database", e);
+        }
+        finally {
+            closeConnection(conn);
+        }
     }
 
     /** {@inheritDoc} */
@@ -788,24 +842,29 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> implements L
 
     /**
      * @param stmt Statement.
-     * @param stmtType Statement type for error message.
-     * @param batchSz Expected batch size.
+     * @param stmtType Statement description for error message.
+     * @param fromIdx Objects in batch start from index.
+     * @param prepared Expected objects in batch.
+     * @param objects All objects.
      */
-    private void executeBatch(Statement stmt, String stmtType, int batchSz) throws SQLException {
+    private void executeBatch(Statement stmt, String stmtType, int fromIdx, int prepared, Collection<?> objects)
+        throws SQLException {
         int[] rowCounts = stmt.executeBatch();
 
         int numOfRowCnt = rowCounts.length;
 
-        if (numOfRowCnt != batchSz)
-            log.warning("JDBC driver did not return the expected number of row counts," +
-                " actual row count: " + numOfRowCnt + " expected: " + batchSz);
+        if (numOfRowCnt != prepared)
+            log.warning("JDBC driver did not return the expected number of updated row counts," +
+                " actual row count: " + numOfRowCnt + " expected: " + prepared);
+
+        Object[] arr = null;
+
+        for (int i = 0; i < numOfRowCnt; i++)
+            if (rowCounts[i] != 1) {
+                if (arr == null)
+                    arr = objects.toArray();
 
-        for (int rowCount : rowCounts)
-            if (rowCount != 1) {
-                // TODO stmtType used 2 times?
-                // TODO print warning for all keys.
-                // TODO while failed - convert collection to array and print warnins to each failed key.
-                log.warning("Batch " + stmtType + " returned unexpected row count from " + stmtType + " statement");
+                log.warning("Batch " + stmtType + " returned unexpected updated row count for: " + arr[fromIdx + i]);
             }
     }
 
@@ -822,7 +881,7 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> implements L
 
             PreparedStatement delStmt = null;
 
-            int cnt = 0;
+            int fromIdx = 0, prepared = 0;
 
             for (Object key : keys) {
                 Object keyTypeId = keyTypeId(key);
@@ -836,9 +895,11 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> implements L
                 }
 
                 if (!currKeyTypeId.equals(keyTypeId)) {
-                    executeBatch(delStmt, "deleteAll", cnt);
+                    executeBatch(delStmt, "deleteAll", fromIdx, prepared, keys);
 
-                    cnt = 0;
+                    fromIdx += prepared;
+
+                    prepared = 0;
 
                     currKeyTypeId = keyTypeId;
                 }
@@ -847,15 +908,17 @@ public abstract class JdbcCacheStore<K, V> extends CacheStore<K, V> implements L
 
                 delStmt.addBatch();
 
-                if (++cnt % batchSz == 0) {
-                    executeBatch(delStmt, "deleteAll", cnt);
+                if (++prepared % batchSz == 0) {
+                    executeBatch(delStmt, "deleteAll", fromIdx, prepared, keys);
+
+                    fromIdx += prepared;
 
-                    cnt = 0;
+                    prepared = 0;
                 }
             }
 
-            if (delStmt != null && cnt % batchSz != 0)
-                executeBatch(delStmt, "deleteAll", cnt);
+            if (delStmt != null && prepared % batchSz != 0)
+                executeBatch(delStmt, "deleteAll", fromIdx, prepared, keys);
         }
         catch (SQLException e) {
             throw new CacheWriterException("Failed to remove values from database", e);