You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/01/13 07:25:52 UTC
incubator-ignite git commit: # IGNITE-32 WIP: Store implementation
batch size and pool size.
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-32 fcfe5019d -> eaeca2e5a
# IGNITE-32 WIP: Store implementation batch size and pool size.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/eaeca2e5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/eaeca2e5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/eaeca2e5
Branch: refs/heads/ignite-32
Commit: eaeca2e5ae95c4f7bc9ef5fffe33ca7cee081057
Parents: fcfe501
Author: AKuznetsov <ak...@gridgain.com>
Authored: Tue Jan 13 13:26:16 2015 +0700
Committer: AKuznetsov <ak...@gridgain.com>
Committed: Tue Jan 13 13:26:16 2015 +0700
----------------------------------------------------------------------
.../grid/cache/store/auto/AutoCacheStore.java | 320 ++++++++++++-------
1 file changed, 200 insertions(+), 120 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eaeca2e5/modules/core/src/main/java/org/gridgain/grid/cache/store/auto/AutoCacheStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/cache/store/auto/AutoCacheStore.java b/modules/core/src/main/java/org/gridgain/grid/cache/store/auto/AutoCacheStore.java
index ff7974b..ee56152 100644
--- a/modules/core/src/main/java/org/gridgain/grid/cache/store/auto/AutoCacheStore.java
+++ b/modules/core/src/main/java/org/gridgain/grid/cache/store/auto/AutoCacheStore.java
@@ -54,8 +54,8 @@ public abstract class AutoCacheStore<K, V> implements GridCacheStore<K, V> {
/** Remove item(s) query. */
protected final String remQry;
- /** Batch size for load query. */
- protected final int loadBatchSize;
+ /** Max key count for load query per statement. */
+ protected final int maxKeysPerStmt;
/** Database table name. */
private final String tblName;
@@ -70,10 +70,10 @@ public abstract class AutoCacheStore<K, V> implements GridCacheStore<K, V> {
private final Set<String> uniqCols;
/** Mapper for key. */
- protected JdbcMapper<K> keyMapper;
+ protected final JdbcMapper<K> keyMapper;
/** Mapper for value. */
- protected JdbcMapper<V> valMapper;
+ protected final JdbcMapper<V> valMapper;
/**
*
@@ -96,9 +96,9 @@ public abstract class AutoCacheStore<K, V> implements GridCacheStore<K, V> {
loadQrySingle = loadQuery(tblName, keyCols, valCols, 1);
- loadBatchSize = MAX_QRY_PARAMETERS / keyCols.size();
+ maxKeysPerStmt = maxParamsCnt / keyCols.size();
- loadQry = loadQuery(tblName, keyCols, uniqCols, loadBatchSize);
+ loadQry = loadQuery(tblName, keyCols, uniqCols, maxKeysPerStmt);
putQry = putQuery(tblName, keyCols, uniqCols);
@@ -110,13 +110,13 @@ public abstract class AutoCacheStore<K, V> implements GridCacheStore<K, V> {
}
/**
- * Construct query for select values with key count less or equal {@code loadBatchSize}
+ * Construct query for select values with key count less or equal {@code maxKeysPerStmt}
* @param keyCnt Key count.
*/
protected String loadQueryLast(int keyCnt) {
- assert keyCnt >= loadBatchSize;
+ assert keyCnt >= maxKeysPerStmt;
- if (keyCnt == loadBatchSize)
+ if (keyCnt == maxKeysPerStmt)
return loadQry;
if (keyCnt == 1)
@@ -126,8 +126,11 @@ public abstract class AutoCacheStore<K, V> implements GridCacheStore<K, V> {
}
}
- /** Max query parameters count. */
- protected static final int MAX_QRY_PARAMETERS = 2000;
+ /** Default max query parameters count. */
+ protected static final int DFLT_MAX_PARAMS_CNT = 2000;
+
+ /** Default batch size for put and remove operations. */
+ protected static final int DFLT_BATCH_SIZE = 512;
/** Connection attribute name. */
protected static final String ATTR_CONN = "JDBC_STORE_CONNECTION";
@@ -165,7 +168,7 @@ public abstract class AutoCacheStore<K, V> implements GridCacheStore<K, V> {
protected String passwd;
/** Execute. */
- protected ExecutorService exec = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+ protected ExecutorService exec;
/** Paths to xml with type mapping description. */
protected Collection<String> typeMetadataPaths;
@@ -176,6 +179,15 @@ public abstract class AutoCacheStore<K, V> implements GridCacheStore<K, V> {
/** Type cache. */
protected Map<Object, TypeCache> typesCache;
+ /** Max workers thread count. These threads are responsible for execute query. */
+ protected int maxPoolSz = Runtime.getRuntime().availableProcessors();
+
+ /** Max query parameters count. */
+ protected int maxParamsCnt = DFLT_MAX_PARAMS_CNT;
+
+ /** Maximum batch size for put and remove operations. */
+ protected int batchSz = DFLT_BATCH_SIZE;
+
/**
* Initializes store.
*
@@ -219,6 +231,8 @@ public abstract class AutoCacheStore<K, V> implements GridCacheStore<K, V> {
setTypeMetadata(typeMeta);
}
+ exec = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+
buildTypeCache();
initOk = true;
@@ -422,87 +436,6 @@ public abstract class AutoCacheStore<K, V> implements GridCacheStore<K, V> {
}
/**
- * @return Data source.
- */
- public DataSource getDataSource() {
- return dataSrc;
- }
-
- /**
- * @param dataSrc Data source.
- */
- public void setDataSource(DataSource dataSrc) {
- this.dataSrc = dataSrc;
- }
-
- /**
- * @return Connection URL.
- */
- public String getConnUrl() {
- return connUrl;
- }
-
- /**
- * @param connUrl Connection URL.
- */
- public void setConnUrl(String connUrl) {
- this.connUrl = connUrl;
- }
-
- /**
- * @return Password for database access.
- */
- public String getPassword() {
- return passwd;
- }
-
- /**
- * @param passwd Password for database access.
- */
- public void setPassword(String passwd) {
- this.passwd = passwd;
- }
-
- /**
- * @return User name for database access.
- */
- public String getUser() {
- return user;
- }
-
- /**
- * @param user User name for database access.
- */
- public void setUser(String user) {
- this.user = user;
- }
-
- /**
- * @return Paths to xml with type mapping description.
- */
- public Collection<String> getTypeMetadataPaths() {
- return typeMetadataPaths;
- }
-
- /**
- * Set paths to xml with type mapping description.
- *
- * @param typeMetadataPaths Paths to xml.
- */
- public void setTypeMetadataPaths(Collection<String> typeMetadataPaths) {
- this.typeMetadataPaths = typeMetadataPaths;
- }
-
- /**
- * Set type mapping description.
- *
- * @param typeMetadata Type mapping description.
- */
- public void setTypeMetadata(Collection<GridCacheQueryTypeMetadata> typeMetadata) {
- this.typeMetadata = typeMetadata;
- }
-
- /**
* Construct load cache query.
*
* @param tblName Database table name.
@@ -525,7 +458,7 @@ public abstract class AutoCacheStore<K, V> implements GridCacheStore<K, V> {
protected String loadQuery(String tblName, Collection<String> keyCols, Iterable<String> valCols, int keyCnt) {
assert !keyCols.isEmpty();
- assert keyCols.size() * keyCnt <= MAX_QRY_PARAMETERS;
+ assert keyCols.size() * keyCnt <= maxParamsCnt;
SB sb = new SB(String.format("SELECT %s FROM %s WHERE ", mkString(valCols, ","), tblName));
@@ -606,37 +539,37 @@ public abstract class AutoCacheStore<K, V> implements GridCacheStore<K, V> {
for (final TypeCache type : typesCache.values())
futs.add(exec.submit(new Callable<Void>() {
@Override public Void call() throws Exception {
- Connection conn = null;
+ Connection conn = null;
- try {
- PreparedStatement stmt = null;
+ try {
+ PreparedStatement stmt = null;
- try {
- conn = connection(null);
+ try {
+ conn = connection(null);
- stmt = conn.prepareStatement(type.loadCacheQry);
+ stmt = conn.prepareStatement(type.loadCacheQry);
- ResultSet rs = stmt.executeQuery();
+ ResultSet rs = stmt.executeQuery();
- while (rs.next()) {
- K key = type.keyMapper.readObject(ignite, rs);
- V val = type.valMapper.readObject(ignite, rs);
+ while (rs.next()) {
+ K key = type.keyMapper.readObject(ignite, rs);
+ V val = type.valMapper.readObject(ignite, rs);
- clo.apply(key, val);
- }
- }
- catch (SQLException e) {
- throw new IgniteCheckedException("Failed to load cache", e);
- }
- finally {
- U.closeQuiet(stmt);
+ clo.apply(key, val);
}
}
+ catch (SQLException e) {
+ throw new IgniteCheckedException("Failed to load cache", e);
+ }
finally {
- closeConnection(conn);
+ U.closeQuiet(stmt);
}
+ }
+ finally {
+ closeConnection(conn);
+ }
- return null;
+ return null;
}
}));
@@ -734,7 +667,7 @@ public abstract class AutoCacheStore<K, V> implements GridCacheStore<K, V> {
}
/** {@inheritDoc} */
- @Override public final void loadAll(@Nullable final IgniteTx tx, Collection<? extends K> keys,
+ @Override public void loadAll(@Nullable final IgniteTx tx, Collection<? extends K> keys,
final IgniteBiInClosure<K, V> c)
throws IgniteCheckedException {
Map<Object, Collection<K>> splittedKeys = U.newHashMap(typesCache.size());
@@ -751,7 +684,7 @@ public abstract class AutoCacheStore<K, V> implements GridCacheStore<K, V> {
batch.add(key);
- if (batch.size() == typesCache.get(typeKey).loadBatchSize) {
+ if (batch.size() == typesCache.get(typeKey).maxKeysPerStmt) {
final Collection<K> p = splittedKeys.remove(typeKey);
futs.add(exec.submit(new Callable<Void>() {
@@ -840,15 +773,21 @@ public abstract class AutoCacheStore<K, V> implements GridCacheStore<K, V> {
stmt = conn.prepareStatement(type.putQry);
+ int cnt = 0;
+
for (Map.Entry<? extends K, ? extends V> entry : map) {
int startIdx = type.keyMapper.setParameters(stmt, 1, entry.getKey());
type.valMapper.setParameters(stmt, startIdx, entry.getValue());
stmt.addBatch();
+
+ if (cnt++ % batchSz == 0)
+ stmt.executeBatch();
}
- stmt.executeBatch();
+ if (cnt % batchSz != 0)
+ stmt.executeBatch();
}
catch (SQLException e) {
throw new IgniteCheckedException("Failed to put objects", e);
@@ -859,7 +798,7 @@ public abstract class AutoCacheStore<K, V> implements GridCacheStore<K, V> {
}
/** {@inheritDoc} */
- @Override public final void putAll(@Nullable final IgniteTx tx, Map<? extends K, ? extends V> map)
+ @Override public void putAll(@Nullable final IgniteTx tx, Map<? extends K, ? extends V> map)
throws IgniteCheckedException {
Map<Object, Collection<Map.Entry<? extends K, ? extends V>>> keyByType = U.newHashMap(typesCache.size());
@@ -953,13 +892,19 @@ public abstract class AutoCacheStore<K, V> implements GridCacheStore<K, V> {
stmt = conn.prepareStatement(type.remQry);
+ int cnt = 0;
+
for (K key : keys) {
type.keyMapper.setParameters(stmt, 1, key);
stmt.addBatch();
+
+ if (cnt++ % batchSz == 0)
+ stmt.executeBatch();
}
- stmt.executeBatch();
+ if (cnt % batchSz != 0)
+ stmt.executeBatch();
}
catch (SQLException e) {
throw new IgniteCheckedException("Failed to remove values by keys.", e);
@@ -970,7 +915,7 @@ public abstract class AutoCacheStore<K, V> implements GridCacheStore<K, V> {
}
/** {@inheritDoc} */
- @Override public final void removeAll(@Nullable IgniteTx tx, Collection<? extends K> keys)
+ @Override public void removeAll(@Nullable IgniteTx tx, Collection<? extends K> keys)
throws IgniteCheckedException {
Map<Object, Collection<K>> keyByType = U.newHashMap(typesCache.size());
@@ -988,4 +933,139 @@ public abstract class AutoCacheStore<K, V> implements GridCacheStore<K, V> {
for (Map.Entry<?, Collection<K>> entry : keyByType.entrySet())
removeAll(tx, entry.getKey(), entry.getValue());
}
+
+ /**
+ * @return Data source.
+ */
+ public DataSource getDataSource() {
+ return dataSrc;
+ }
+
+ /**
+ * @param dataSrc Data source.
+ */
+ public void setDataSource(DataSource dataSrc) {
+ this.dataSrc = dataSrc;
+ }
+
+ /**
+ * @return Connection URL.
+ */
+ public String getConnUrl() {
+ return connUrl;
+ }
+
+ /**
+ * @param connUrl Connection URL.
+ */
+ public void setConnUrl(String connUrl) {
+ this.connUrl = connUrl;
+ }
+
+ /**
+ * @return Password for database access.
+ */
+ public String getPassword() {
+ return passwd;
+ }
+
+ /**
+ * @param passwd Password for database access.
+ */
+ public void setPassword(String passwd) {
+ this.passwd = passwd;
+ }
+
+ /**
+ * @return User name for database access.
+ */
+ public String getUser() {
+ return user;
+ }
+
+ /**
+ * @param user User name for database access.
+ */
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ /**
+ * @return Paths to xml with type mapping description.
+ */
+ public Collection<String> getTypeMetadataPaths() {
+ return typeMetadataPaths;
+ }
+
+ /**
+ * Set paths to xml with type mapping description.
+ *
+ * @param typeMetadataPaths Paths to xml.
+ */
+ public void setTypeMetadataPaths(Collection<String> typeMetadataPaths) {
+ this.typeMetadataPaths = typeMetadataPaths;
+ }
+
+ /**
+ * Set type mapping description.
+ *
+ * @param typeMetadata Type mapping description.
+ */
+ public void setTypeMetadata(Collection<GridCacheQueryTypeMetadata> typeMetadata) {
+ this.typeMetadata = typeMetadata;
+ }
+
+ /**
+ * Get Max workers thread count. These threads are responsible for execute query.
+ *
+ * @return Max workers thread count.
+ */
+ public int getMaxPoolSize() {
+ return maxPoolSz;
+ }
+
+ /**
+ * Set Max workers thread count. These threads are responsible for execute query.
+ *
+ * @param maxPoolSz Max workers thread count.
+ */
+ public void setMaxPoolSize(int maxPoolSz) {
+ this.maxPoolSz = maxPoolSz;
+ }
+
+ /**
+ * Get max query parameters count.
+ *
+ * @return Max query parameters count.
+ */
+ public int getMaxParamsCnt() {
+ return maxParamsCnt;
+ }
+
+ /**
+ * Set max query parameters count.
+ *
+ * @param maxParamsCnt Max query parameters count.
+ */
+ public void setMaxParamsCnt(int maxParamsCnt) {
+ this.maxParamsCnt = maxParamsCnt;
+ }
+
+ /**
+ * Get maximum batch size for put and remove operations.
+ *
+ * @return Maximum batch size.
+ */
+ public int getBatchSize() {
+ return batchSz;
+ }
+
+ /**
+ * Set maximum batch size for put and remove operations.
+ *
+ * @param batchSz Maximum batch size.
+ */
+ public void setBatchSize(int batchSz) {
+ this.batchSz = batchSz;
+ }
}