You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/01/13 07:25:52 UTC

incubator-ignite git commit: # IGNITE-32 WIP: Store implementation batch size and pool size.

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-32 fcfe5019d -> eaeca2e5a


# IGNITE-32 WIP: Store implementation batch size and pool size.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/eaeca2e5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/eaeca2e5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/eaeca2e5

Branch: refs/heads/ignite-32
Commit: eaeca2e5ae95c4f7bc9ef5fffe33ca7cee081057
Parents: fcfe501
Author: AKuznetsov <ak...@gridgain.com>
Authored: Tue Jan 13 13:26:16 2015 +0700
Committer: AKuznetsov <ak...@gridgain.com>
Committed: Tue Jan 13 13:26:16 2015 +0700

----------------------------------------------------------------------
 .../grid/cache/store/auto/AutoCacheStore.java   | 320 ++++++++++++-------
 1 file changed, 200 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eaeca2e5/modules/core/src/main/java/org/gridgain/grid/cache/store/auto/AutoCacheStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/cache/store/auto/AutoCacheStore.java b/modules/core/src/main/java/org/gridgain/grid/cache/store/auto/AutoCacheStore.java
index ff7974b..ee56152 100644
--- a/modules/core/src/main/java/org/gridgain/grid/cache/store/auto/AutoCacheStore.java
+++ b/modules/core/src/main/java/org/gridgain/grid/cache/store/auto/AutoCacheStore.java
@@ -54,8 +54,8 @@ public abstract class AutoCacheStore<K, V> implements GridCacheStore<K, V> {
         /** Remove item(s) query. */
         protected final String remQry;
 
-        /** Batch size for load query. */
-        protected final int loadBatchSize;
+        /** Max key count for load query per statement. */
+        protected final int maxKeysPerStmt;
 
         /** Database table name. */
         private final String tblName;
@@ -70,10 +70,10 @@ public abstract class AutoCacheStore<K, V> implements GridCacheStore<K, V> {
         private final Set<String> uniqCols;
 
         /** Mapper for key. */
-        protected JdbcMapper<K> keyMapper;
+        protected final JdbcMapper<K> keyMapper;
 
         /** Mapper for value. */
-        protected JdbcMapper<V> valMapper;
+        protected final JdbcMapper<V> valMapper;
 
         /**
          *
@@ -96,9 +96,9 @@ public abstract class AutoCacheStore<K, V> implements GridCacheStore<K, V> {
 
             loadQrySingle = loadQuery(tblName, keyCols, valCols, 1);
 
-            loadBatchSize = MAX_QRY_PARAMETERS / keyCols.size();
+            maxKeysPerStmt = maxParamsCnt / keyCols.size();
 
-            loadQry = loadQuery(tblName, keyCols, uniqCols, loadBatchSize);
+            loadQry = loadQuery(tblName, keyCols, uniqCols, maxKeysPerStmt);
 
             putQry = putQuery(tblName, keyCols, uniqCols);
 
@@ -110,13 +110,13 @@ public abstract class AutoCacheStore<K, V> implements GridCacheStore<K, V> {
         }
 
         /**
-         * Construct query for select values with key count less or equal {@code loadBatchSize}
+         * Construct query for select values with key count less or equal {@code maxKeysPerStmt}
          * @param keyCnt Key count.
          */
         protected String loadQueryLast(int keyCnt) {
-            assert keyCnt >= loadBatchSize;
+            assert keyCnt >= maxKeysPerStmt;
 
-            if (keyCnt == loadBatchSize)
+            if (keyCnt == maxKeysPerStmt)
                 return loadQry;
 
             if (keyCnt == 1)
@@ -126,8 +126,11 @@ public abstract class AutoCacheStore<K, V> implements GridCacheStore<K, V> {
         }
     }
 
-    /** Max query parameters count. */
-    protected static final int MAX_QRY_PARAMETERS = 2000;
+    /** Default max query parameters count. */
+    protected static final int DFLT_MAX_PARAMS_CNT = 2000;
+
+    /** Default batch size for put and remove operations. */
+    protected static final int DFLT_BATCH_SIZE = 512;
 
     /** Connection attribute name. */
     protected static final String ATTR_CONN = "JDBC_STORE_CONNECTION";
@@ -165,7 +168,7 @@ public abstract class AutoCacheStore<K, V> implements GridCacheStore<K, V> {
     protected String passwd;
 
     /** Execute. */
-    protected ExecutorService exec = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+    protected ExecutorService exec;
 
     /** Paths to xml with type mapping description. */
     protected Collection<String> typeMetadataPaths;
@@ -176,6 +179,15 @@ public abstract class AutoCacheStore<K, V> implements GridCacheStore<K, V> {
     /** Type cache. */
     protected Map<Object, TypeCache> typesCache;
 
+    /** Max workers thread count. These threads are responsible for execute query. */
+    protected int maxPoolSz = Runtime.getRuntime().availableProcessors();
+
+    /** Max query parameters count. */
+    protected int maxParamsCnt = DFLT_MAX_PARAMS_CNT;
+
+    /** Maximum batch size for put and remove operations. */
+    protected int batchSz = DFLT_BATCH_SIZE;
+
     /**
      * Initializes store.
      *
@@ -219,6 +231,8 @@ public abstract class AutoCacheStore<K, V> implements GridCacheStore<K, V> {
                         setTypeMetadata(typeMeta);
                     }
 
+                    exec = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+
                     buildTypeCache();
 
                     initOk = true;
@@ -422,87 +436,6 @@ public abstract class AutoCacheStore<K, V> implements GridCacheStore<K, V> {
     }
 
     /**
-     * @return Data source.
-     */
-    public DataSource getDataSource() {
-        return dataSrc;
-    }
-
-    /**
-     * @param dataSrc Data source.
-     */
-    public void setDataSource(DataSource dataSrc) {
-        this.dataSrc = dataSrc;
-    }
-
-    /**
-     * @return Connection URL.
-     */
-    public String getConnUrl() {
-        return connUrl;
-    }
-
-    /**
-     * @param connUrl Connection URL.
-     */
-    public void setConnUrl(String connUrl) {
-        this.connUrl = connUrl;
-    }
-
-    /**
-     * @return Password for database access.
-     */
-    public String getPassword() {
-        return passwd;
-    }
-
-    /**
-     * @param passwd Password for database access.
-     */
-    public void setPassword(String passwd) {
-        this.passwd = passwd;
-    }
-
-    /**
-     * @return User name for database access.
-     */
-    public String getUser() {
-        return user;
-    }
-
-    /**
-     * @param user User name for database access.
-     */
-    public void setUser(String user) {
-        this.user = user;
-    }
-
-    /**
-     * @return Paths to xml with type mapping description.
-     */
-    public Collection<String> getTypeMetadataPaths() {
-        return typeMetadataPaths;
-    }
-
-    /**
-     * Set paths to xml with type mapping description.
-     *
-     * @param typeMetadataPaths Paths to xml.
-     */
-    public void setTypeMetadataPaths(Collection<String> typeMetadataPaths) {
-        this.typeMetadataPaths = typeMetadataPaths;
-    }
-
-    /**
-     * Set type mapping description.
-     *
-     * @param typeMetadata Type mapping description.
-     */
-    public void setTypeMetadata(Collection<GridCacheQueryTypeMetadata> typeMetadata) {
-        this.typeMetadata = typeMetadata;
-    }
-
-    /**
      * Construct load cache query.
      *
      * @param tblName Database table name.
@@ -525,7 +458,7 @@ public abstract class AutoCacheStore<K, V> implements GridCacheStore<K, V> {
     protected String loadQuery(String tblName, Collection<String> keyCols, Iterable<String> valCols, int keyCnt) {
         assert !keyCols.isEmpty();
 
-        assert keyCols.size() * keyCnt <= MAX_QRY_PARAMETERS;
+        assert keyCols.size() * keyCnt <= maxParamsCnt;
 
         SB sb = new SB(String.format("SELECT %s FROM %s WHERE ", mkString(valCols, ","), tblName));
 
@@ -606,37 +539,37 @@ public abstract class AutoCacheStore<K, V> implements GridCacheStore<K, V> {
         for (final TypeCache type : typesCache.values())
             futs.add(exec.submit(new Callable<Void>() {
                 @Override public Void call() throws Exception {
-                    Connection conn = null;
+                Connection conn = null;
 
-                    try {
-                        PreparedStatement stmt = null;
+                try {
+                    PreparedStatement stmt = null;
 
-                        try {
-                            conn = connection(null);
+                    try {
+                        conn = connection(null);
 
-                            stmt = conn.prepareStatement(type.loadCacheQry);
+                        stmt = conn.prepareStatement(type.loadCacheQry);
 
-                            ResultSet rs = stmt.executeQuery();
+                        ResultSet rs = stmt.executeQuery();
 
-                            while (rs.next()) {
-                                K key = type.keyMapper.readObject(ignite, rs);
-                                V val = type.valMapper.readObject(ignite, rs);
+                        while (rs.next()) {
+                            K key = type.keyMapper.readObject(ignite, rs);
+                            V val = type.valMapper.readObject(ignite, rs);
 
-                                clo.apply(key, val);
-                            }
-                        }
-                        catch (SQLException e) {
-                            throw new IgniteCheckedException("Failed to load cache", e);
-                        }
-                        finally {
-                            U.closeQuiet(stmt);
+                            clo.apply(key, val);
                         }
                     }
+                    catch (SQLException e) {
+                        throw new IgniteCheckedException("Failed to load cache", e);
+                    }
                     finally {
-                        closeConnection(conn);
+                        U.closeQuiet(stmt);
                     }
+                }
+                finally {
+                    closeConnection(conn);
+                }
 
-                    return null;
+                return null;
                 }
             }));
 
@@ -734,7 +667,7 @@ public abstract class AutoCacheStore<K, V> implements GridCacheStore<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override public final void loadAll(@Nullable final IgniteTx tx, Collection<? extends K> keys,
+    @Override public void loadAll(@Nullable final IgniteTx tx, Collection<? extends K> keys,
         final IgniteBiInClosure<K, V> c)
         throws IgniteCheckedException {
         Map<Object, Collection<K>> splittedKeys = U.newHashMap(typesCache.size());
@@ -751,7 +684,7 @@ public abstract class AutoCacheStore<K, V> implements GridCacheStore<K, V> {
 
             batch.add(key);
 
-            if (batch.size() == typesCache.get(typeKey).loadBatchSize) {
+            if (batch.size() == typesCache.get(typeKey).maxKeysPerStmt) {
                 final Collection<K> p = splittedKeys.remove(typeKey);
 
                 futs.add(exec.submit(new Callable<Void>() {
@@ -840,15 +773,21 @@ public abstract class AutoCacheStore<K, V> implements GridCacheStore<K, V> {
 
             stmt = conn.prepareStatement(type.putQry);
 
+            int cnt = 0;
+
             for (Map.Entry<? extends K, ? extends V> entry : map) {
                 int startIdx = type.keyMapper.setParameters(stmt, 1, entry.getKey());
 
                 type.valMapper.setParameters(stmt, startIdx, entry.getValue());
 
                 stmt.addBatch();
+
+                if (cnt++ % batchSz == 0)
+                    stmt.executeBatch();
             }
 
-            stmt.executeBatch();
+            if (cnt % batchSz != 0)
+                stmt.executeBatch();
         }
         catch (SQLException e) {
             throw new IgniteCheckedException("Failed to put objects", e);
@@ -859,7 +798,7 @@ public abstract class AutoCacheStore<K, V> implements GridCacheStore<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override public final void putAll(@Nullable final IgniteTx tx, Map<? extends K, ? extends V> map)
+    @Override public void putAll(@Nullable final IgniteTx tx, Map<? extends K, ? extends V> map)
         throws IgniteCheckedException {
         Map<Object, Collection<Map.Entry<? extends K, ? extends V>>> keyByType = U.newHashMap(typesCache.size());
 
@@ -953,13 +892,19 @@ public abstract class AutoCacheStore<K, V> implements GridCacheStore<K, V> {
 
             stmt = conn.prepareStatement(type.remQry);
 
+            int cnt = 0;
+
             for (K key : keys) {
                 type.keyMapper.setParameters(stmt, 1, key);
 
                 stmt.addBatch();
+
+                if (cnt++ % batchSz == 0)
+                    stmt.executeBatch();
             }
 
-            stmt.executeBatch();
+            if (cnt % batchSz != 0)
+                stmt.executeBatch();
         }
         catch (SQLException e) {
             throw new IgniteCheckedException("Failed to remove values by keys.", e);
@@ -970,7 +915,7 @@ public abstract class AutoCacheStore<K, V> implements GridCacheStore<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override public final void removeAll(@Nullable IgniteTx tx, Collection<? extends K> keys)
+    @Override public void removeAll(@Nullable IgniteTx tx, Collection<? extends K> keys)
         throws IgniteCheckedException {
         Map<Object, Collection<K>> keyByType = U.newHashMap(typesCache.size());
 
@@ -988,4 +933,139 @@ public abstract class AutoCacheStore<K, V> implements GridCacheStore<K, V> {
         for (Map.Entry<?, Collection<K>> entry : keyByType.entrySet())
             removeAll(tx, entry.getKey(), entry.getValue());
     }
+
+    /**
+     * @return Data source.
+     */
+    public DataSource getDataSource() {
+        return dataSrc;
+    }
+
+    /**
+     * @param dataSrc Data source.
+     */
+    public void setDataSource(DataSource dataSrc) {
+        this.dataSrc = dataSrc;
+    }
+
+    /**
+     * @return Connection URL.
+     */
+    public String getConnUrl() {
+        return connUrl;
+    }
+
+    /**
+     * @param connUrl Connection URL.
+     */
+    public void setConnUrl(String connUrl) {
+        this.connUrl = connUrl;
+    }
+
+    /**
+     * @return Password for database access.
+     */
+    public String getPassword() {
+        return passwd;
+    }
+
+    /**
+     * @param passwd Password for database access.
+     */
+    public void setPassword(String passwd) {
+        this.passwd = passwd;
+    }
+
+    /**
+     * @return User name for database access.
+     */
+    public String getUser() {
+        return user;
+    }
+
+    /**
+     * @param user User name for database access.
+     */
+    public void setUser(String user) {
+        this.user = user;
+    }
+
+    /**
+     * @return Paths to xml with type mapping description.
+     */
+    public Collection<String> getTypeMetadataPaths() {
+        return typeMetadataPaths;
+    }
+
+    /**
+     * Set paths to xml with type mapping description.
+     *
+     * @param typeMetadataPaths Paths to xml.
+     */
+    public void setTypeMetadataPaths(Collection<String> typeMetadataPaths) {
+        this.typeMetadataPaths = typeMetadataPaths;
+    }
+
+    /**
+     * Set type mapping description.
+     *
+     * @param typeMetadata Type mapping description.
+     */
+    public void setTypeMetadata(Collection<GridCacheQueryTypeMetadata> typeMetadata) {
+        this.typeMetadata = typeMetadata;
+    }
+
+    /**
+     * Get Max workers thread count. These threads are responsible for execute query.
+     *
+     * @return Max workers thread count.
+     */
+    public int getMaxPoolSize() {
+        return maxPoolSz;
+    }
+
+    /**
+     * Set Max workers thread count. These threads are responsible for execute query.
+     *
+     * @param maxPoolSz Max workers thread count.
+     */
+    public void setMaxPoolSize(int maxPoolSz) {
+        this.maxPoolSz = maxPoolSz;
+    }
+
+    /**
+     * Get max query parameters count.
+     *
+     * @return Max query parameters count.
+     */
+    public int getMaxParamsCnt() {
+        return maxParamsCnt;
+    }
+
+    /**
+     * Set max query parameters count.
+     *
+     * @param maxParamsCnt Max query parameters count.
+     */
+    public void setMaxParamsCnt(int maxParamsCnt) {
+        this.maxParamsCnt = maxParamsCnt;
+    }
+
+    /**
+     * Get maximum batch size for put and remove operations.
+     *
+     * @return Maximum batch size.
+     */
+    public int getBatchSize() {
+        return batchSz;
+    }
+
+    /**
+     * Set maximum batch size for put and remove operations.
+     *
+     * @param batchSz Maximum batch size.
+     */
+    public void setBatchSize(int batchSz) {
+        this.batchSz = batchSz;
+    }
 }