You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/11/26 07:50:14 UTC
[20/50] [abbrv] ignite git commit: IGNITE-1753 Refactored usages of
deprectaed CacheTypeMetadata to JdbcType.
IGNITE-1753 Refactored usages of deprectaed CacheTypeMetadata to JdbcType.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d71f6129
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d71f6129
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d71f6129
Branch: refs/heads/ignite-1537
Commit: d71f6129bc737539e61206c391fc25c776f36242
Parents: 19d2dd0
Author: AKuznetsov <ak...@gridgain.com>
Authored: Mon Nov 23 18:20:50 2015 +0700
Committer: AKuznetsov <ak...@gridgain.com>
Committed: Mon Nov 23 18:20:50 2015 +0700
----------------------------------------------------------------------
examples/schema-import/bin/db-init.sql | 3 +-
.../org/apache/ignite/schema/CacheConfig.java | 7 +-
.../java/org/apache/ignite/schema/Demo.java | 20 +-
.../org/apache/ignite/cache/QueryIndex.java | 53 +-
.../store/jdbc/CacheAbstractJdbcStore.java | 638 ++++++++++++-------
.../store/jdbc/CacheJdbcBlobStoreFactory.java | 14 +-
.../cache/store/jdbc/CacheJdbcPojoStore.java | 444 +++++++++----
.../store/jdbc/CacheJdbcPojoStoreFactory.java | 277 +++++++-
.../ignite/cache/store/jdbc/JdbcType.java | 255 ++++++++
.../cache/store/jdbc/JdbcTypeDefaultHasher.java | 43 ++
.../ignite/cache/store/jdbc/JdbcTypeField.java | 172 +++++
.../ignite/cache/store/jdbc/JdbcTypeHasher.java | 34 +
.../processors/query/GridQueryProcessor.java | 6 +-
.../ignite/internal/visor/cache/VisorCache.java | 4 +-
.../CacheJdbcPojoStoreAbstractSelfTest.java | 395 ++++++++++++
...dbcPojoStoreOptimizedMarshallerSelfTest.java | 31 +
...JdbcPojoStorePortableMarshallerSelfTest.java | 85 +++
.../store/jdbc/CacheJdbcPojoStoreTest.java | 200 +++---
...eJdbcStoreAbstractMultithreadedSelfTest.java | 2 +-
.../ignite/testsuites/IgniteCacheTestSuite.java | 6 +-
modules/schema-import/README.txt | 176 ++---
.../ignite/schema/generator/CodeGenerator.java | 198 +++---
.../ignite/schema/generator/XmlGenerator.java | 101 +--
.../apache/ignite/schema/model/IndexItem.java | 54 --
.../ignite/schema/model/PojoDescriptor.java | 72 +--
.../ignite/schema/model/SchemaDescriptor.java | 6 +-
.../schema/parser/DatabaseMetadataParser.java | 12 +-
.../apache/ignite/schema/parser/DbTable.java | 37 +-
.../parser/dialect/DatabaseMetadataDialect.java | 32 +-
.../parser/dialect/JdbcMetadataDialect.java | 22 +-
.../parser/dialect/OracleMetadataDialect.java | 24 +-
.../apache/ignite/schema/ui/ModalDialog.java | 6 +-
.../ignite/schema/ui/SchemaImportApp.java | 13 +-
.../schema/test/AbstractSchemaImportTest.java | 4 +-
.../schema/test/model/ignite-type-metadata.xml | 610 +++++++++---------
.../yardstick/config/ignite-store-config.xml | 50 +-
36 files changed, 2844 insertions(+), 1262 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d71f6129/examples/schema-import/bin/db-init.sql
----------------------------------------------------------------------
diff --git a/examples/schema-import/bin/db-init.sql b/examples/schema-import/bin/db-init.sql
index f02236a..8a91a6a 100644
--- a/examples/schema-import/bin/db-init.sql
+++ b/examples/schema-import/bin/db-init.sql
@@ -17,7 +17,8 @@
-- Script of database initialization for Schema Import Demo.
drop table PERSON;
-create table PERSON(id integer not null, first_name varchar(50), last_name varchar(50), salary double not null, PRIMARY KEY(id));
+
+create table PERSON(id integer not null PRIMARY KEY, first_name varchar(50), last_name varchar(50), salary double not null);
insert into PERSON(id, first_name, last_name, salary) values(1, 'Johannes', 'Kepler', 1000);
insert into PERSON(id, first_name, last_name, salary) values(2, 'Galileo', 'Galilei', 2000);
http://git-wip-us.apache.org/repos/asf/ignite/blob/d71f6129/examples/schema-import/src/main/java/org/apache/ignite/schema/CacheConfig.java
----------------------------------------------------------------------
diff --git a/examples/schema-import/src/main/java/org/apache/ignite/schema/CacheConfig.java b/examples/schema-import/src/main/java/org/apache/ignite/schema/CacheConfig.java
index cb316c5..c5801cc 100644
--- a/examples/schema-import/src/main/java/org/apache/ignite/schema/CacheConfig.java
+++ b/examples/schema-import/src/main/java/org/apache/ignite/schema/CacheConfig.java
@@ -17,8 +17,7 @@
package org.apache.ignite.schema;
-import javax.cache.configuration.Factory;
-import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreFactory;
import org.apache.ignite.configuration.CacheConfiguration;
/**
@@ -31,7 +30,7 @@ public class CacheConfig {
* @param name Cache name.
* @param storeFactory Cache store factory.
*/
- public static <K, V> CacheConfiguration<K, V> cache(String name, Factory<CacheStore<K, V>> storeFactory) {
+ public static <K, V> CacheConfiguration<K, V> cache(String name, CacheJdbcPojoStoreFactory<K, V> storeFactory) {
throw new IllegalStateException("Please run Ignite Schema Import Utility as described in README.txt");
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d71f6129/examples/schema-import/src/main/java/org/apache/ignite/schema/Demo.java
----------------------------------------------------------------------
diff --git a/examples/schema-import/src/main/java/org/apache/ignite/schema/Demo.java b/examples/schema-import/src/main/java/org/apache/ignite/schema/Demo.java
index cade7f1..a981f5a 100644
--- a/examples/schema-import/src/main/java/org/apache/ignite/schema/Demo.java
+++ b/examples/schema-import/src/main/java/org/apache/ignite/schema/Demo.java
@@ -18,13 +18,13 @@
package org.apache.ignite.schema;
import javax.cache.Cache;
-import javax.cache.configuration.Factory;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.Ignition;
-import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStore;
+import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreFactory;
+import org.apache.ignite.cache.store.jdbc.dialect.H2Dialect;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.transactions.Transaction;
import org.h2.jdbcx.JdbcConnectionPool;
@@ -38,16 +38,14 @@ import org.h2.jdbcx.JdbcConnectionPool;
*/
public class Demo {
/**
- * Constructs and returns a fully configured instance of a {@link CacheJdbcPojoStore}.
+ * Constructs and returns a fully configured instance of a {@link CacheJdbcPojoStoreFactory}.
*/
- private static class H2DemoStoreFactory<K, V> implements Factory<CacheStore<K, V>> {
- /** {@inheritDoc} */
- @Override public CacheStore<K, V> create() {
- CacheJdbcPojoStore<K, V> store = new CacheJdbcPojoStore<>();
+ private static class H2DemoStoreFactory<K, V> extends CacheJdbcPojoStoreFactory<K, V> {
+ /** Default constructor. */
+ H2DemoStoreFactory() {
+ setDialect(new H2Dialect());
- store.setDataSource(JdbcConnectionPool.create("jdbc:h2:tcp://localhost/~/schema-import/demo", "sa", ""));
-
- return store;
+ setDataSource(JdbcConnectionPool.create("jdbc:h2:tcp://localhost/~/schema-import/demo", "sa", ""));
}
}
@@ -144,4 +142,4 @@ public class Demo {
System.out.println(">>> Updated person: " + cache.get(key));
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d71f6129/modules/core/src/main/java/org/apache/ignite/cache/QueryIndex.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/QueryIndex.java b/modules/core/src/main/java/org/apache/ignite/cache/QueryIndex.java
index f12044d..af11999 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/QueryIndex.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/QueryIndex.java
@@ -50,20 +50,33 @@ public class QueryIndex implements Serializable {
/**
* Creates single-field sorted ascending index.
*
- * @param name Field name.
+ * @param field Field name.
*/
- public QueryIndex(String name) {
- this(name, QueryIndexType.SORTED, true);
+ public QueryIndex(String field) {
+ this(field, QueryIndexType.SORTED, true);
}
/**
* Creates single-field sorted index.
*
- * @param name Field name.
+ * @param field Field name.
* @param asc Ascending flag.
*/
- public QueryIndex(String name, boolean asc) {
- this(name, QueryIndexType.SORTED, asc);
+ public QueryIndex(String field, boolean asc) {
+ this(field, QueryIndexType.SORTED, asc);
+ }
+
+ /**
+ * Creates single-field sorted index.
+ *
+ * @param field Field name.
+ * @param asc Ascending flag.
+ * @param name Index name.
+ */
+ public QueryIndex(String field, boolean asc, String name) {
+ this(field, QueryIndexType.SORTED, asc);
+
+ this.name = name;
}
/**
@@ -71,14 +84,20 @@ public class QueryIndex implements Serializable {
* If index is sorted, then ascending sorting is used by default.
* To specify sort order, use the next method.
* This constructor should also have a corresponding setter method.
+ *
+ * @param field Field name.
+ * @param type Index type.
*/
public QueryIndex(String field, QueryIndexType type) {
this(Arrays.asList(field), type);
}
/**
- * Creates index for one field. The last boolean parameter
- * is ignored for non-sorted indexes.
+ * Creates index for one field. The last boolean parameter is ignored for non-sorted indexes.
+ *
+ * @param field Field name.
+ * @param type Index type.
+ * @param asc Ascending flag.
*/
public QueryIndex(String field, QueryIndexType type, boolean asc) {
fields = new LinkedHashMap<>();
@@ -88,6 +107,22 @@ public class QueryIndex implements Serializable {
}
/**
+ * Creates index for one field. The last boolean parameter is ignored for non-sorted indexes.
+ *
+ * @param field Field name.
+ * @param type Index type.
+ * @param asc Ascending flag.
+ * @param name Index name.
+ */
+ public QueryIndex(String field, QueryIndexType type, boolean asc, String name) {
+ fields = new LinkedHashMap<>();
+ fields.put(field, asc);
+
+ this.type = type;
+ this.name = name;
+ }
+
+ /**
* Creates index for a collection of fields. If index is sorted, fields will be sorted in
* ascending order.
*
@@ -189,4 +224,4 @@ public class QueryIndex implements Serializable {
public void setIndexType(QueryIndexType type) {
this.type = type;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d71f6129/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
index 6e19234..6dc413b 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
@@ -30,6 +30,8 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
@@ -66,6 +68,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lifecycle.LifecycleAware;
+import org.apache.ignite.marshaller.portable.BinaryMarshaller;
import org.apache.ignite.resources.CacheStoreSessionResource;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
@@ -75,6 +78,10 @@ import org.jetbrains.annotations.Nullable;
import static java.sql.Statement.EXECUTE_FAILED;
import static java.sql.Statement.SUCCESS_NO_INFO;
+import static org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreFactory.DFLT_BATCH_SIZE;
+import static org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreFactory.DFLT_WRITE_ATTEMPTS;
+import static org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreFactory.DFLT_PARALLEL_LOAD_CACHE_MINIMUM_THRESHOLD;
+
/**
* Implementation of {@link CacheStore} backed by JDBC.
* <p>
@@ -99,35 +106,43 @@ import static java.sql.Statement.SUCCESS_NO_INFO;
* <h2 class="header">Java Example</h2>
* <pre name="code" class="java">
* ...
- * CacheConfiguration ccfg = new CacheConfiguration<>();
- *
- * // Configure cache store.
- * ccfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(ConfigurationSnippet.store()));
+ * // Create store factory.
+ * CacheJdbcPojoStoreFactory storeFactory = new CacheJdbcPojoStoreFactory();
+ * storeFactory.setDataSourceBean("your_data_source_name");
+ * storeFactory.setDialect(new H2Dialect());
+ * storeFactory.setTypes(array_with_your_types);
+ * ...
+ * ccfg.setCacheStoreFactory(storeFactory);
* ccfg.setReadThrough(true);
* ccfg.setWriteThrough(true);
*
- * // Configure cache types metadata.
- * ccfg.setTypeMetadata(ConfigurationSnippet.typeMetadata());
- *
* cfg.setCacheConfiguration(ccfg);
* ...
* </pre>
*/
public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>, LifecycleAware {
- /** Max attempt write count. */
- protected static final int MAX_ATTEMPT_WRITE_COUNT = 2;
-
- /** Default batch size for put and remove operations. */
- protected static final int DFLT_BATCH_SIZE = 512;
-
- /** Default batch size for put and remove operations. */
- protected static final int DFLT_PARALLEL_LOAD_CACHE_MINIMUM_THRESHOLD = 512;
-
/** Connection attribute property name. */
protected static final String ATTR_CONN_PROP = "JDBC_STORE_CONNECTION";
- /** Empty column value. */
- protected static final Object[] EMPTY_COLUMN_VALUE = new Object[] { null };
+ /** Built in Java types names. */
+ protected static final Collection<String> BUILT_IN_TYPES = new HashSet<>();
+
+ static {
+ BUILT_IN_TYPES.add("java.math.BigDecimal");
+ BUILT_IN_TYPES.add("java.lang.Boolean");
+ BUILT_IN_TYPES.add("java.lang.Byte");
+ BUILT_IN_TYPES.add("java.lang.Character");
+ BUILT_IN_TYPES.add("java.lang.Double");
+ BUILT_IN_TYPES.add("java.util.Date");
+ BUILT_IN_TYPES.add("java.sql.Date");
+ BUILT_IN_TYPES.add("java.lang.Float");
+ BUILT_IN_TYPES.add("java.lang.Integer");
+ BUILT_IN_TYPES.add("java.lang.Long");
+ BUILT_IN_TYPES.add("java.lang.Short");
+ BUILT_IN_TYPES.add("java.lang.String");
+ BUILT_IN_TYPES.add("java.sql.Timestamp");
+ BUILT_IN_TYPES.add("java.util.UUID");
+ }
/** Auto-injected store session. */
@CacheStoreSessionResource
@@ -135,7 +150,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
/** Auto injected ignite instance. */
@IgniteInstanceResource
- private Ignite ignite;
+ protected Ignite ignite;
/** Auto-injected logger instance. */
@LoggerResource
@@ -151,30 +166,40 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
/** Cache with entry mapping description. (cache name, (key id, mapping description)). */
protected volatile Map<String, Map<Object, EntryMapping>> cacheMappings = Collections.emptyMap();
+ /** Maximum batch size for writeAll and deleteAll operations. */
+ private int batchSize = DFLT_BATCH_SIZE;
+
/** Database dialect. */
protected JdbcDialect dialect;
- /** Max workers thread count. These threads are responsible for load cache. */
- private int maxPoolSz = Runtime.getRuntime().availableProcessors();
+ /** Maximum write attempts in case of database error. */
+ private int maxWrtAttempts = DFLT_WRITE_ATTEMPTS;
- /** Maximum batch size for writeAll and deleteAll operations. */
- private int batchSz = DFLT_BATCH_SIZE;
+ /** Max workers thread count. These threads are responsible for load cache. */
+ private int maxPoolSize = Runtime.getRuntime().availableProcessors();
/** Parallel load cache minimum threshold. If {@code 0} then load sequentially. */
private int parallelLoadCacheMinThreshold = DFLT_PARALLEL_LOAD_CACHE_MINIMUM_THRESHOLD;
+ /** Types that store could process. */
+ private JdbcType[] types;
+
+ /** Hash calculator. */
+ protected JdbcTypeHasher hasher = JdbcTypeDefaultHasher.INSTANCE;
+
/**
* Get field value from object for use as query parameter.
*
* @param cacheName Cache name.
* @param typeName Type name.
+ * @param typeKind Type kind.
* @param fieldName Field name.
* @param obj Cache object.
* @return Field value from object.
* @throws CacheException in case of error.
*/
- @Nullable protected abstract Object extractParameter(@Nullable String cacheName, String typeName, String fieldName,
- Object obj) throws CacheException;
+ @Nullable protected abstract Object extractParameter(@Nullable String cacheName, String typeName, TypeKind typeKind,
+ String fieldName, Object obj) throws CacheException;
/**
* Construct object from query result.
@@ -182,33 +207,36 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
* @param <R> Type of result object.
* @param cacheName Cache name.
* @param typeName Type name.
- * @param fields Fields descriptors.
+ * @param typeKind Type kind.
+ * @param flds Fields descriptors.
+ * @param hashFlds Field names for hash code calculation.
* @param loadColIdxs Select query columns index.
* @param rs ResultSet.
* @return Constructed object.
* @throws CacheLoaderException If failed to construct cache object.
*/
- protected abstract <R> R buildObject(@Nullable String cacheName, String typeName,
- Collection<CacheTypeFieldMetadata> fields, Map<String, Integer> loadColIdxs, ResultSet rs)
+ protected abstract <R> R buildObject(@Nullable String cacheName, String typeName, TypeKind typeKind,
+ JdbcTypeField[] flds, Collection<String> hashFlds, Map<String, Integer> loadColIdxs, ResultSet rs)
throws CacheLoaderException;
/**
- * Extract key type id from key object.
+ * Calculate type ID for object.
*
- * @param key Key object.
- * @return Key type id.
- * @throws CacheException If failed to get type key id from object.
+ * @param obj Object to calculate type ID for.
+ * @return Type ID.
+ * @throws CacheException If failed to calculate type ID for given object.
*/
- protected abstract Object keyTypeId(Object key) throws CacheException;
+ protected abstract Object typeIdForObject(Object obj) throws CacheException;
/**
- * Extract key type id from key class name.
+ * Calculate type ID for given type name.
*
- * @param type String description of key type.
- * @return Key type id.
- * @throws CacheException If failed to get type key id from object.
+ * @param kind If {@code true} then calculate type ID for POJO otherwise for binary object .
+ * @param typeName String description of type name.
+ * @return Type ID.
+ * @throws CacheException If failed to get type ID for given type name.
*/
- protected abstract Object keyTypeId(String type) throws CacheException;
+ protected abstract Object typeIdForTypeName(TypeKind kind, String typeName) throws CacheException;
/**
* Prepare internal store specific builders for provided types metadata.
@@ -217,7 +245,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
* @param types Collection of types.
* @throws CacheException If failed to prepare internal builders for types.
*/
- protected abstract void prepareBuilders(@Nullable String cacheName, Collection<CacheTypeMetadata> types)
+ protected abstract void prepareBuilders(@Nullable String cacheName, Collection<JdbcType> types)
throws CacheException;
/**
@@ -480,23 +508,23 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
? em.loadCacheQry
: em.loadCacheRangeQuery(lowerBound != null, upperBound != null));
- int ix = 1;
+ int idx = 1;
if (lowerBound != null)
for (int i = lowerBound.length; i > 0; i--)
for (int j = 0; j < i; j++)
- stmt.setObject(ix++, lowerBound[j]);
+ stmt.setObject(idx++, lowerBound[j]);
if (upperBound != null)
for (int i = upperBound.length; i > 0; i--)
for (int j = 0; j < i; j++)
- stmt.setObject(ix++, upperBound[j]);
+ stmt.setObject(idx++, upperBound[j]);
ResultSet rs = stmt.executeQuery();
while (rs.next()) {
- K key = buildObject(em.cacheName, em.keyType(), em.keyColumns(), em.loadColIdxs, rs);
- V val = buildObject(em.cacheName, em.valueType(), em.valueColumns(), em.loadColIdxs, rs);
+ K key = buildObject(em.cacheName, em.keyType(), em.keyKind(), em.keyColumns(), em.keyCols, em.loadColIdxs, rs);
+ V val = buildObject(em.cacheName, em.valueType(), em.valueKind(), em.valueColumns(), null, em.loadColIdxs, rs);
clo.apply(key, val);
}
@@ -527,58 +555,86 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
}
/**
- * Object is a simple type.
+ * Checks if type configured properly.
*
- * @param cls Class.
- * @return {@code True} if object is a simple type.
- */
- protected static boolean simpleType(Class<?> cls) {
- return (Number.class.isAssignableFrom(cls) || String.class.isAssignableFrom(cls) ||
- java.util.Date.class.isAssignableFrom(cls) || Boolean.class.isAssignableFrom(cls) ||
- UUID.class.isAssignableFrom(cls));
- }
-
- /**
* @param cacheName Cache name to check mapping for.
- * @param clsName Class name.
- * @param fields Fields descriptors.
- * @throws CacheException If failed to check type metadata.
+ * @param typeName Type name.
+ * @param flds Fields descriptors.
+ * @throws CacheException If failed to check type configuration.
*/
- private static void checkMapping(@Nullable String cacheName, String clsName,
- Collection<CacheTypeFieldMetadata> fields) throws CacheException {
+ private void checkTypeConfiguration(@Nullable String cacheName, TypeKind kind, String typeName,
+ JdbcTypeField[] flds) throws CacheException {
try {
- Class<?> cls = Class.forName(clsName);
-
- if (simpleType(cls)) {
- if (fields.size() != 1)
- throw new CacheException("More than one field for simple type [cache name=" + cacheName
- + ", type=" + clsName + " ]");
+ if (kind == TypeKind.BUILT_IN) {
+ if (flds.length != 1)
+ throw new CacheException("More than one field for built in type [cache=" + U.maskName(cacheName) +
+ ", type=" + typeName + " ]");
- CacheTypeFieldMetadata field = F.first(fields);
+ JdbcTypeField field = flds[0];
- if (field.getDatabaseName() == null)
- throw new CacheException("Missing database name in mapping description [cache name=" + cacheName
- + ", type=" + clsName + " ]");
+ if (field.getDatabaseFieldName() == null)
+ throw new CacheException("Missing database name in mapping description [cache=" +
+ U.maskName(cacheName) + ", type=" + typeName + " ]");
- field.setJavaType(cls);
+ field.setJavaFieldType(Class.forName(typeName));
}
else
- for (CacheTypeFieldMetadata field : fields) {
- if (field.getDatabaseName() == null)
- throw new CacheException("Missing database name in mapping description [cache name=" + cacheName
- + ", type=" + clsName + " ]");
-
- if (field.getJavaName() == null)
- throw new CacheException("Missing field name in mapping description [cache name=" + cacheName
- + ", type=" + clsName + " ]");
-
- if (field.getJavaType() == null)
- throw new CacheException("Missing field type in mapping description [cache name=" + cacheName
- + ", type=" + clsName + " ]");
+ for (JdbcTypeField field : flds) {
+ if (field.getDatabaseFieldName() == null)
+ throw new CacheException("Missing database name in mapping description [cache=" +
+ U.maskName(cacheName) + ", type=" + typeName + " ]");
+
+ if (field.getJavaFieldName() == null)
+ throw new CacheException("Missing field name in mapping description [cache=" +
+ U.maskName(cacheName) + ", type=" + typeName + " ]");
+
+ if (field.getJavaFieldType() == null)
+ throw new CacheException("Missing field type in mapping description [cache=" +
+ U.maskName(cacheName) + ", type=" + typeName + " ]");
}
}
catch (ClassNotFoundException e) {
- throw new CacheException("Failed to find class: " + clsName, e);
+ throw new CacheException("Failed to find class: " + typeName, e);
+ }
+ }
+
+ /**
+ * For backward compatibility translate old field type descriptors to new format.
+ *
+ * @param oldFlds Fields in old format.
+ * @return Fields in new format.
+ */
+ @Deprecated
+ private JdbcTypeField[] translateFields(Collection<CacheTypeFieldMetadata> oldFlds) {
+ JdbcTypeField[] newFlds = new JdbcTypeField[oldFlds.size()];
+
+ int idx = 0;
+
+ for (CacheTypeFieldMetadata oldField : oldFlds) {
+ newFlds[idx] = new JdbcTypeField(oldField.getDatabaseType(), oldField.getDatabaseName(),
+ oldField.getJavaType(), oldField.getJavaName());
+
+ idx++;
+ }
+
+ return newFlds;
+ }
+
+ /**
+ * @param type Type name to check.
+ * @return {@code True} if class not found.
+ */
+ protected TypeKind kindForName(String type) {
+ if (BUILT_IN_TYPES.contains(type))
+ return TypeKind.BUILT_IN;
+
+ try {
+ Class.forName(type);
+
+ return TypeKind.POJO;
+ }
+ catch(ClassNotFoundException ignored) {
+ return TypeKind.BINARY;
}
}
@@ -587,46 +643,104 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
* @return Type mappings for specified cache name.
* @throws CacheException If failed to initialize cache mappings.
*/
- private Map<Object, EntryMapping> cacheMappings(@Nullable String cacheName) throws CacheException {
+ private Map<Object, EntryMapping> getOrCreateCacheMappings(@Nullable String cacheName) throws CacheException {
Map<Object, EntryMapping> entryMappings = cacheMappings.get(cacheName);
if (entryMappings != null)
return entryMappings;
cacheMappingsLock.lock();
-
try {
entryMappings = cacheMappings.get(cacheName);
if (entryMappings != null)
return entryMappings;
- CacheConfiguration ccfg = ignite().cache(cacheName).getConfiguration(CacheConfiguration.class);
+ // If no types configured, check CacheTypeMetadata for backward compatibility.
+ if (types == null) {
+ CacheConfiguration ccfg = ignite.cache(cacheName).getConfiguration(CacheConfiguration.class);
+
+ Collection<CacheTypeMetadata> oldTypes = ccfg.getTypeMetadata();
+
+ types = new JdbcType[oldTypes.size()];
+
+ int idx = 0;
+
+ for (CacheTypeMetadata oldType : oldTypes) {
+ JdbcType newType = new JdbcType();
- Collection<CacheTypeMetadata> types = ccfg.getTypeMetadata();
+ newType.setCacheName(cacheName);
- entryMappings = U.newHashMap(types.size());
+ newType.setDatabaseSchema(oldType.getDatabaseSchema());
+ newType.setDatabaseTable(oldType.getDatabaseTable());
- for (CacheTypeMetadata type : types) {
- Object keyTypeId = keyTypeId(type.getKeyType());
+ newType.setKeyType(oldType.getKeyType());
+ newType.setKeyFields(translateFields(oldType.getKeyFields()));
- if (entryMappings.containsKey(keyTypeId))
- throw new CacheException("Key type must be unique in type metadata [cache name=" + cacheName +
- ", key type=" + type.getKeyType() + "]");
+ newType.setValueType(oldType.getValueType());
+ newType.setValueFields(translateFields(oldType.getValueFields()));
- checkMapping(cacheName, type.getKeyType(), type.getKeyFields());
- checkMapping(cacheName, type.getValueType(), type.getValueFields());
+ types[idx] = newType;
- entryMappings.put(keyTypeId(type.getKeyType()), new EntryMapping(cacheName, dialect, type));
+ idx++;
+ }
}
- Map<String, Map<Object, EntryMapping>> mappings = new HashMap<>(cacheMappings);
+ List<JdbcType> cacheTypes = new ArrayList<>(types.length);
+
+ for (JdbcType type : types)
+ if ((cacheName != null && cacheName.equals(type.getCacheName())) ||
+ (cacheName == null && type.getCacheName() == null))
+ cacheTypes.add(type);
+
+ entryMappings = U.newHashMap(cacheTypes.size());
+
+ if (!cacheTypes.isEmpty()) {
+ boolean binarySupported = ignite.configuration().getMarshaller() instanceof BinaryMarshaller;
+
+ for (JdbcType type : cacheTypes) {
+ String keyType = type.getKeyType();
+ String valType = type.getValueType();
+
+ TypeKind keyKind = kindForName(keyType);
+
+ if (!binarySupported && keyKind == TypeKind.BINARY)
+ throw new CacheException("Key type has no class [cache=" + U.maskName(cacheName) +
+ ", type=" + keyType + "]");
+
+ checkTypeConfiguration(cacheName, keyKind, keyType, type.getKeyFields());
+
+ Object keyTypeId = typeIdForTypeName(keyKind, keyType);
- mappings.put(cacheName, entryMappings);
+ if (entryMappings.containsKey(keyTypeId))
+ throw new CacheException("Key type must be unique in type metadata [cache=" +
+ U.maskName(cacheName) + ", type=" + keyType + "]");
- prepareBuilders(cacheName, types);
+ TypeKind valKind = kindForName(valType);
- cacheMappings = mappings;
+ checkTypeConfiguration(cacheName, valKind, valType, type.getValueFields());
+
+ entryMappings.put(keyTypeId, new EntryMapping(cacheName, dialect, type, keyKind, valKind));
+
+ // Add one more binding to binary typeId for POJOs,
+ // because object could be passed to store in binary format.
+ if (binarySupported && keyKind == TypeKind.POJO) {
+ keyTypeId = typeIdForTypeName(TypeKind.BINARY, keyType);
+
+ valKind = valKind == TypeKind.POJO ? TypeKind.BINARY : valKind;
+
+ entryMappings.put(keyTypeId, new EntryMapping(cacheName, dialect, type, TypeKind.BINARY, valKind));
+ }
+ }
+
+ Map<String, Map<Object, EntryMapping>> mappings = new HashMap<>(cacheMappings);
+
+ mappings.put(cacheName, entryMappings);
+
+ prepareBuilders(cacheName, cacheTypes);
+
+ cacheMappings = mappings;
+ }
return entryMappings;
}
@@ -637,19 +751,21 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
/**
* @param cacheName Cache name.
- * @param keyTypeId Key type id.
- * @param key Key object.
+ * @param typeId Type id.
* @return Entry mapping.
* @throws CacheException If mapping for key was not found.
*/
- private EntryMapping entryMapping(String cacheName, Object keyTypeId, Object key) throws CacheException {
- EntryMapping em = cacheMappings(cacheName).get(keyTypeId);
+ private EntryMapping entryMapping(String cacheName, Object typeId) throws CacheException {
+ Map<Object, EntryMapping> mappings = getOrCreateCacheMappings(cacheName);
+
+ EntryMapping em = mappings.get(typeId);
if (em == null) {
String maskedCacheName = U.maskName(cacheName);
- throw new CacheException("Failed to find mapping description [key=" + key +
- ", cache=" + maskedCacheName + "]. Please configure CacheTypeMetadata to associate '" + maskedCacheName + "' with JdbcPojoStore.");
+ throw new CacheException("Failed to find mapping description [cache=" + maskedCacheName +
+ ", typeId=" + typeId + "]. Please configure JdbcType to associate cache '" + maskedCacheName +
+ "' with JdbcPojoStore.");
}
return em;
@@ -663,34 +779,37 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
String cacheName = session().cacheName();
try {
- pool = Executors.newFixedThreadPool(maxPoolSz);
+ pool = Executors.newFixedThreadPool(maxPoolSize);
Collection<Future<?>> futs = new ArrayList<>();
+ Map<Object, EntryMapping> mappings = getOrCreateCacheMappings(cacheName);
+
if (args != null && args.length > 0) {
if (args.length % 2 != 0)
throw new CacheLoaderException("Expected even number of arguments, but found: " + args.length);
if (log.isDebugEnabled())
- log.debug("Start loading entries from db using user queries from arguments");
+ log.debug("Start loading entries from db using user queries from arguments...");
for (int i = 0; i < args.length; i += 2) {
String keyType = args[i].toString();
String selQry = args[i + 1].toString();
- EntryMapping em = entryMapping(cacheName, keyTypeId(keyType), keyType);
+ EntryMapping em = entryMapping(cacheName, typeIdForTypeName(kindForName(keyType), keyType));
futs.add(pool.submit(new LoadCacheCustomQueryWorker<>(em, selQry, clo)));
}
}
else {
- Collection<EntryMapping> entryMappings = cacheMappings(session().cacheName()).values();
+ Collection<EntryMapping> entryMappings = mappings.values();
for (EntryMapping em : entryMappings) {
if (parallelLoadCacheMinThreshold > 0) {
- log.debug("Multithread loading entries from db [cache name=" + cacheName +
- ", key type=" + em.keyType() + " ]");
+ if (log.isDebugEnabled())
+ log.debug("Multithread loading entries from db [cache=" + U.maskName(cacheName) +
+ ", keyType=" + em.keyType() + " ]");
Connection conn = null;
@@ -738,8 +857,8 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
}
else {
if (log.isDebugEnabled())
- log.debug("Single thread loading entries from db [cache name=" + cacheName +
- ", key type=" + em.keyType() + " ]");
+ log.debug("Single thread loading entries from db [cache=" + U.maskName(cacheName) +
+ ", keyType=" + em.keyType() + " ]");
futs.add(pool.submit(loadCacheFull(em, clo)));
}
@@ -750,10 +869,10 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
U.get(fut);
if (log.isDebugEnabled())
- log.debug("Cache loaded from db: " + cacheName);
+ log.debug("Cache loaded from db: " + U.maskName(cacheName));
}
catch (IgniteCheckedException e) {
- throw new CacheLoaderException("Failed to load cache: " + cacheName, e.getCause());
+ throw new CacheLoaderException("Failed to load cache: " + U.maskName(cacheName), e.getCause());
}
finally {
U.shutdownNow(getClass(), pool, log);
@@ -764,7 +883,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
@Nullable @Override public V load(K key) throws CacheLoaderException {
assert key != null;
- EntryMapping em = entryMapping(session().cacheName(), keyTypeId(key), key);
+ EntryMapping em = entryMapping(session().cacheName(), typeIdForObject(key));
if (log.isDebugEnabled())
log.debug("Load value from db [table= " + em.fullTableName() + ", key=" + key + "]");
@@ -783,7 +902,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
ResultSet rs = stmt.executeQuery();
if (rs.next())
- return buildObject(em.cacheName, em.valueType(), em.valueColumns(), em.loadColIdxs, rs);
+ return buildObject(em.cacheName, em.valueType(), em.valueKind(), em.valueColumns(), null, em.loadColIdxs, rs);
}
catch (SQLException e) {
throw new CacheLoaderException("Failed to load object [table=" + em.fullTableName() +
@@ -807,14 +926,14 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
String cacheName = session().cacheName();
- Map<Object, LoadWorker<K, V>> workers = U.newHashMap(cacheMappings(cacheName).size());
+ Map<Object, LoadWorker<K, V>> workers = U.newHashMap(getOrCreateCacheMappings(cacheName).size());
Map<K, V> res = new HashMap<>();
for (K key : keys) {
- Object keyTypeId = keyTypeId(key);
+ Object keyTypeId = typeIdForObject(key);
- EntryMapping em = entryMapping(cacheName, keyTypeId, key);
+ EntryMapping em = entryMapping(cacheName, keyTypeId);
LoadWorker<K, V> worker = workers.get(keyTypeId);
@@ -852,7 +971,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
try {
CacheWriterException we = null;
- for (int attempt = 0; attempt < MAX_ATTEMPT_WRITE_COUNT; attempt++) {
+ for (int attempt = 0; attempt < maxWrtAttempts; attempt++) {
int paramIdx = fillValueParameters(updStmt, 1, em, entry.getValue());
fillKeyParameters(updStmt, paramIdx, em, entry.getKey());
@@ -921,7 +1040,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
K key = entry.getKey();
- EntryMapping em = entryMapping(session().cacheName(), keyTypeId(key), key);
+ EntryMapping em = entryMapping(session().cacheName(), typeIdForObject(key));
if (log.isDebugEnabled())
log.debug("Start write entry to database [table=" + em.fullTableName() + ", entry=" + entry + "]");
@@ -937,9 +1056,9 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
try {
stmt = conn.prepareStatement(em.mergeQry);
- int i = fillKeyParameters(stmt, em, key);
+ int idx = fillKeyParameters(stmt, em, key);
- fillValueParameters(stmt, i, em, entry.getValue());
+ fillValueParameters(stmt, idx, em, entry.getValue());
int updCnt = stmt.executeUpdate();
@@ -1010,15 +1129,15 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
for (Cache.Entry<? extends K, ? extends V> entry : entries) {
K key = entry.getKey();
- Object keyTypeId = keyTypeId(key);
+ Object keyTypeId = typeIdForObject(key);
- em = entryMapping(cacheName, keyTypeId, key);
+ em = entryMapping(cacheName, keyTypeId);
if (currKeyTypeId == null || !currKeyTypeId.equals(keyTypeId)) {
if (mergeStmt != null) {
if (log.isDebugEnabled())
- log.debug("Write entries to db [cache name=" + cacheName +
- ", key type=" + em.keyType() + ", count=" + prepared + "]");
+ log.debug("Write entries to db [cache=" + U.maskName(cacheName) +
+ ", keyType=" + em.keyType() + ", cnt=" + prepared + "]");
executeBatch(em, mergeStmt, "writeAll", fromIdx, prepared, lazyEntries);
@@ -1034,16 +1153,16 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
prepared = 0;
}
- int i = fillKeyParameters(mergeStmt, em, key);
+ int idx = fillKeyParameters(mergeStmt, em, key);
- fillValueParameters(mergeStmt, i, em, entry.getValue());
+ fillValueParameters(mergeStmt, idx, em, entry.getValue());
mergeStmt.addBatch();
- if (++prepared % batchSz == 0) {
+ if (++prepared % batchSize == 0) {
if (log.isDebugEnabled())
- log.debug("Write entries to db [cache name=" + cacheName +
- ", key type=" + em.keyType() + ", count=" + prepared + "]");
+ log.debug("Write entries to db [cache=" + U.maskName(cacheName) +
+ ", keyType=" + em.keyType() + ", cnt=" + prepared + "]");
executeBatch(em, mergeStmt, "writeAll", fromIdx, prepared, lazyEntries);
@@ -1053,10 +1172,10 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
}
}
- if (mergeStmt != null && prepared % batchSz != 0) {
+ if (mergeStmt != null && prepared % batchSize != 0) {
if (log.isDebugEnabled())
- log.debug("Write entries to db [cache name=" + cacheName +
- ", key type=" + em.keyType() + ", count=" + prepared + "]");
+ log.debug("Write entries to db [cache=" + U.maskName(cacheName) +
+ ", keyType=" + em.keyType() + ", cnt=" + prepared + "]");
executeBatch(em, mergeStmt, "writeAll", fromIdx, prepared, lazyEntries);
@@ -1067,8 +1186,9 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
}
}
else {
- log.debug("Write entries to db one by one using update and insert statements [cache name=" +
- cacheName + ", count=" + entries.size() + "]");
+ if (log.isDebugEnabled())
+ log.debug("Write entries to db one by one using update and insert statements [cache=" +
+ U.maskName(cacheName) + ", cnt=" + entries.size() + "]");
PreparedStatement insStmt = null;
@@ -1078,9 +1198,9 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
for (Cache.Entry<? extends K, ? extends V> entry : entries) {
K key = entry.getKey();
- Object keyTypeId = keyTypeId(key);
+ Object keyTypeId = typeIdForObject(key);
- EntryMapping em = entryMapping(cacheName, keyTypeId, key);
+ EntryMapping em = entryMapping(cacheName, keyTypeId);
if (currKeyTypeId == null || !currKeyTypeId.equals(keyTypeId)) {
U.closeQuiet(insStmt);
@@ -1116,7 +1236,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
@Override public void delete(Object key) throws CacheWriterException {
assert key != null;
- EntryMapping em = entryMapping(session().cacheName(), keyTypeId(key), key);
+ EntryMapping em = entryMapping(session().cacheName(), typeIdForObject(key));
if (log.isDebugEnabled())
log.debug("Remove value from db [table=" + em.fullTableName() + ", key=" + key + "]");
@@ -1220,9 +1340,9 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
int fromIdx = 0, prepared = 0;
for (Object key : keys) {
- Object keyTypeId = keyTypeId(key);
+ Object keyTypeId = typeIdForObject(key);
- em = entryMapping(cacheName, keyTypeId, key);
+ em = entryMapping(cacheName, keyTypeId);
if (delStmt == null) {
delStmt = conn.prepareStatement(em.remQry);
@@ -1232,8 +1352,8 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
if (!currKeyTypeId.equals(keyTypeId)) {
if (log.isDebugEnabled())
- log.debug("Delete entries from db [cache name=" + cacheName +
- ", key type=" + em.keyType() + ", count=" + prepared + "]");
+ log.debug("Delete entries from db [cache=" + U.maskName(cacheName) +
+ ", keyType=" + em.keyType() + ", cnt=" + prepared + "]");
executeBatch(em, delStmt, "deleteAll", fromIdx, prepared, lazyKeys);
@@ -1248,10 +1368,10 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
delStmt.addBatch();
- if (++prepared % batchSz == 0) {
+ if (++prepared % batchSize == 0) {
if (log.isDebugEnabled())
- log.debug("Delete entries from db [cache name=" + cacheName +
- ", key type=" + em.keyType() + ", count=" + prepared + "]");
+ log.debug("Delete entries from db [cache=" + U.maskName(cacheName) +
+ ", keyType=" + em.keyType() + ", cnt=" + prepared + "]");
executeBatch(em, delStmt, "deleteAll", fromIdx, prepared, lazyKeys);
@@ -1261,10 +1381,10 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
}
}
- if (delStmt != null && prepared % batchSz != 0) {
+ if (delStmt != null && prepared % batchSize != 0) {
if (log.isDebugEnabled())
- log.debug("Delete entries from db [cache name=" + cacheName +
- ", key type=" + em.keyType() + ", count=" + prepared + "]");
+ log.debug("Delete entries from db [cache=" + U.maskName(cacheName) +
+ ", keyType=" + em.keyType() + ", cnt=" + prepared + "]");
executeBatch(em, delStmt, "deleteAll", fromIdx, prepared, lazyKeys);
}
@@ -1281,17 +1401,17 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
* Sets the value of the designated parameter using the given object.
*
* @param stmt Prepare statement.
- * @param i Index for parameters.
+ * @param idx Index for parameters.
* @param field Field descriptor.
* @param fieldVal Field value.
* @throws CacheException If failed to set statement parameter.
*/
- protected void fillParameter(PreparedStatement stmt, int i, CacheTypeFieldMetadata field, @Nullable Object fieldVal)
+ protected void fillParameter(PreparedStatement stmt, int idx, JdbcTypeField field, @Nullable Object fieldVal)
throws CacheException {
try {
if (fieldVal != null) {
- if (field.getJavaType() == UUID.class) {
- switch (field.getDatabaseType()) {
+ if (field.getJavaFieldType() == UUID.class) {
+ switch (field.getDatabaseFieldType()) {
case Types.BINARY:
fieldVal = U.uuidToBytes((UUID)fieldVal);
@@ -1304,13 +1424,13 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
}
}
- stmt.setObject(i, fieldVal);
+ stmt.setObject(idx, fieldVal);
}
else
- stmt.setNull(i, field.getDatabaseType());
+ stmt.setNull(idx, field.getDatabaseFieldType());
}
catch (SQLException e) {
- throw new CacheException("Failed to set statement parameter name: " + field.getDatabaseName(), e);
+ throw new CacheException("Failed to set statement parameter name: " + field.getDatabaseFieldName(), e);
}
}
@@ -1324,8 +1444,8 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
*/
protected int fillKeyParameters(PreparedStatement stmt, int idx, EntryMapping em,
Object key) throws CacheException {
- for (CacheTypeFieldMetadata field : em.keyColumns()) {
- Object fieldVal = extractParameter(em.cacheName, em.keyType(), field.getJavaName(), key);
+ for (JdbcTypeField field : em.keyColumns()) {
+ Object fieldVal = extractParameter(em.cacheName, em.keyType(), em.keyKind(), field.getJavaFieldName(), key);
fillParameter(stmt, idx++, field, fieldVal);
}
@@ -1354,8 +1474,8 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
*/
protected int fillValueParameters(PreparedStatement stmt, int idx, EntryMapping em, Object val)
throws CacheWriterException {
- for (CacheTypeFieldMetadata field : em.uniqValFields) {
- Object fieldVal = extractParameter(em.cacheName, em.valueType(), field.getJavaName(), val);
+ for (JdbcTypeField field : em.uniqValFlds) {
+ Object fieldVal = extractParameter(em.cacheName, em.valueType(), em.valueKind(), field.getJavaFieldName(), val);
fillParameter(stmt, idx++, field, fieldVal);
}
@@ -1401,16 +1521,70 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
* @return Max workers thread count.
*/
public int getMaximumPoolSize() {
- return maxPoolSz;
+ return maxPoolSize;
}
/**
* Set Max workers thread count. These threads are responsible for execute query.
*
- * @param maxPoolSz Max workers thread count.
+ * @param maxPoolSize Max workers thread count.
*/
- public void setMaximumPoolSize(int maxPoolSz) {
- this.maxPoolSz = maxPoolSz;
+ public void setMaximumPoolSize(int maxPoolSize) {
+ this.maxPoolSize = maxPoolSize;
+ }
+
+ /**
+ * Gets maximum number of write attempts in case of database error.
+ *
+ * @return Maximum number of write attempts.
+ */
+ public int getMaximumWriteAttempts() {
+ return maxWrtAttempts;
+ }
+
+ /**
+ * Sets maximum number of write attempts in case of database error.
+ *
+ * @param maxWrtAttempts Number of write attempts.
+ */
+ public void setMaximumWriteAttempts(int maxWrtAttempts) {
+ this.maxWrtAttempts = maxWrtAttempts;
+ }
+
+ /**
+ * Gets types known by store.
+ *
+ * @return Types known by store.
+ */
+ public JdbcType[] getTypes() {
+ return types;
+ }
+
+ /**
+ * Sets store configurations.
+ *
+ * @param types Store should process.
+ */
+ public void setTypes(JdbcType... types) {
+ this.types = types;
+ }
+
+ /**
+ * Gets hash code calculator.
+ *
+ * @return Hash code calculator.
+ */
+ public JdbcTypeHasher getHasher() {
+ return hasher;
+ }
+
+ /**
+ * Sets hash code calculator.
+ *
+ * @param hasher Hash code calculator.
+ */
+ public void setHasher(JdbcTypeHasher hasher) {
+ this.hasher = hasher;
}
/**
@@ -1419,16 +1593,16 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
* @return Maximum batch size.
*/
public int getBatchSize() {
- return batchSz;
+ return batchSize;
}
/**
* Set maximum batch size for write and delete operations.
*
- * @param batchSz Maximum batch size.
+ * @param batchSize Maximum batch size.
*/
- public void setBatchSize(int batchSz) {
- this.batchSz = batchSz;
+ public void setBatchSize(int batchSize) {
+ this.batchSize = batchSize;
}
/**
@@ -1464,6 +1638,18 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
}
/**
+ * Type kind.
+ */
+ protected enum TypeKind {
+ /** Type is known as Java built in type, like {@link String} */
+ BUILT_IN,
+ /** Class for this type is available. */
+ POJO,
+ /** Class for this type is not available. */
+ BINARY
+ }
+
+ /**
* Entry mapping description.
*/
protected static class EntryMapping {
@@ -1510,10 +1696,16 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
private final Map<String, Integer> loadColIdxs;
/** Unique value fields. */
- private final Collection<CacheTypeFieldMetadata> uniqValFields;
+ private final Collection<JdbcTypeField> uniqValFlds;
/** Type metadata. */
- private final CacheTypeMetadata typeMeta;
+ private final JdbcType typeMeta;
+
+ /** Key type kind. */
+ private final TypeKind keyKind;
+
+ /** Value type kind. */
+ private final TypeKind valKind;
/** Full table name. */
private final String fullTblName;
@@ -1523,22 +1715,27 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
* @param dialect JDBC dialect.
* @param typeMeta Type metadata.
*/
- public EntryMapping(@Nullable String cacheName, JdbcDialect dialect, CacheTypeMetadata typeMeta) {
+ public EntryMapping(@Nullable String cacheName, JdbcDialect dialect, JdbcType typeMeta,
+ TypeKind keyKind, TypeKind valKind) {
this.cacheName = cacheName;
this.dialect = dialect;
this.typeMeta = typeMeta;
- Collection<CacheTypeFieldMetadata> keyFields = typeMeta.getKeyFields();
+ this.keyKind = keyKind;
+
+ this.valKind = valKind;
+
+ JdbcTypeField[] keyFields = typeMeta.getKeyFields();
- Collection<CacheTypeFieldMetadata> valFields = typeMeta.getValueFields();
+ JdbcTypeField[] valFields = typeMeta.getValueFields();
- keyCols = databaseColumns(keyFields);
+ keyCols = databaseColumns(F.asList(keyFields));
- uniqValFields = F.view(valFields, new IgnitePredicate<CacheTypeFieldMetadata>() {
- @Override public boolean apply(CacheTypeFieldMetadata col) {
- return !keyCols.contains(col.getDatabaseName());
+ uniqValFlds = F.view(F.asList(valFields), new IgnitePredicate<JdbcTypeField>() {
+ @Override public boolean apply(JdbcTypeField col) {
+ return !keyCols.contains(col.getDatabaseFieldName());
}
});
@@ -1548,7 +1745,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
fullTblName = F.isEmpty(schema) ? tblName : schema + "." + tblName;
- Collection<String> uniqValCols = databaseColumns(uniqValFields);
+ Collection<String> uniqValCols = databaseColumns(uniqValFlds);
cols = F.concat(false, keyCols, uniqValCols);
@@ -1579,21 +1776,49 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
}
/**
- * Extract database column names from {@link CacheTypeFieldMetadata}.
+ * Extract database column names from {@link JdbcTypeField}.
*
- * @param dsc collection of {@link CacheTypeFieldMetadata}.
+ * @param dsc collection of {@link JdbcTypeField}.
* @return Collection with database column names.
*/
- private static Collection<String> databaseColumns(Collection<CacheTypeFieldMetadata> dsc) {
- return F.transform(dsc, new C1<CacheTypeFieldMetadata, String>() {
+ private static Collection<String> databaseColumns(Collection<JdbcTypeField> dsc) {
+ return F.transform(dsc, new C1<JdbcTypeField, String>() {
/** {@inheritDoc} */
- @Override public String apply(CacheTypeFieldMetadata col) {
- return col.getDatabaseName();
+ @Override public String apply(JdbcTypeField col) {
+ return col.getDatabaseFieldName();
}
});
}
/**
+ * @return Key type.
+ */
+ protected String keyType() {
+ return typeMeta.getKeyType();
+ }
+
+ /**
+ * @return Key type kind.
+ */
+ protected TypeKind keyKind() {
+ return keyKind;
+ }
+
+ /**
+ * @return Value type.
+ */
+ protected String valueType() {
+ return typeMeta.getValueType();
+ }
+
+ /**
+ * @return Value type kind.
+ */
+ protected TypeKind valueKind() {
+ return valKind;
+ }
+
+ /**
* Construct query for select values with key count less or equal {@code maxKeysPerStmt}
*
* @param keyCnt Key count.
@@ -1623,25 +1848,11 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
}
/**
- * @return Key type.
- */
- protected String keyType() {
- return typeMeta.getKeyType();
- }
-
- /**
- * @return Value type.
- */
- protected String valueType() {
- return typeMeta.getValueType();
- }
-
- /**
* Gets key columns.
*
* @return Key columns.
*/
- protected Collection<CacheTypeFieldMetadata> keyColumns() {
+ protected JdbcTypeField[] keyColumns() {
return typeMeta.getKeyFields();
}
@@ -1650,7 +1861,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
*
* @return Value columns.
*/
- protected Collection<CacheTypeFieldMetadata> valueColumns() {
+ protected JdbcTypeField[] valueColumns() {
return typeMeta.getValueFields();
}
@@ -1694,8 +1905,8 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
/** {@inheritDoc} */
@Override public Void call() throws Exception {
if (log.isDebugEnabled())
- log.debug("Load cache using custom query [cache name= " + em.cacheName +
- ", key type=" + em.keyType() + ", query=" + qry + "]");
+ log.debug("Load cache using custom query [cache= " + U.maskName(em.cacheName) +
+ ", keyType=" + em.keyType() + ", query=" + qry + "]");
Connection conn = null;
@@ -1716,8 +1927,8 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
colIdxs.put(meta.getColumnLabel(i), i);
while (rs.next()) {
- K1 key = buildObject(em.cacheName, em.keyType(), em.keyColumns(), colIdxs, rs);
- V1 val = buildObject(em.cacheName, em.valueType(), em.valueColumns(), colIdxs, rs);
+ K1 key = buildObject(em.cacheName, em.keyType(), em.keyKind(), em.keyColumns(), em.keyCols, colIdxs, rs);
+ V1 val = buildObject(em.cacheName, em.valueType(), em.valueKind(), em.valueColumns(), null, colIdxs, rs);
clo.apply(key, val);
}
@@ -1790,8 +2001,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
/** {@inheritDoc} */
@Override public Map<K1, V1> call() throws Exception {
if (log.isDebugEnabled())
- log.debug("Load values from db [table= " + em.fullTableName() +
- ", key count=" + keys.size() + "]");
+ log.debug("Load values from db [table= " + em.fullTableName() + ", keysCnt=" + keys.size() + "]");
PreparedStatement stmt = null;
@@ -1801,8 +2011,8 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
int idx = 1;
for (Object key : keys)
- for (CacheTypeFieldMetadata field : em.keyColumns()) {
- Object fieldVal = extractParameter(em.cacheName, em.keyType(), field.getJavaName(), key);
+ for (JdbcTypeField field : em.keyColumns()) {
+ Object fieldVal = extractParameter(em.cacheName, em.keyType(), em.keyKind(), field.getJavaFieldName(), key);
fillParameter(stmt, idx++, field, fieldVal);
}
@@ -1812,8 +2022,8 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
Map<K1, V1> entries = U.newHashMap(keys.size());
while (rs.next()) {
- K1 key = buildObject(em.cacheName, em.keyType(), em.keyColumns(), em.loadColIdxs, rs);
- V1 val = buildObject(em.cacheName, em.valueType(), em.valueColumns(), em.loadColIdxs, rs);
+ K1 key = buildObject(em.cacheName, em.keyType(), em.keyKind(), em.keyColumns(), em.keyCols, em.loadColIdxs, rs);
+ V1 val = buildObject(em.cacheName, em.valueType(), em.valueKind(), em.valueColumns(), null, em.loadColIdxs, rs);
entries.put(key, val);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d71f6129/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcBlobStoreFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcBlobStoreFactory.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcBlobStoreFactory.java
index 74ab30b..6a46619 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcBlobStoreFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcBlobStoreFactory.java
@@ -35,7 +35,7 @@ import org.apache.ignite.resources.SpringApplicationContextResource;
*
* <h2 class="header">Spring Example</h2>
* <pre name="code" class="xml">
- * <bean id= "simpleDataSource" class="org.h2.jdbcx.JdbcDataSource"/>
+ * <bean id= "myDataSource" class="org.h2.jdbcx.JdbcDataSource"/>
*
* <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
* ...
@@ -46,7 +46,7 @@ import org.apache.ignite.resources.SpringApplicationContextResource;
* <property name="cacheStoreFactory">
* <bean class="org.apache.ignite.cache.store.jdbc.CacheJdbcBlobStoreFactory">
* <property name="user" value = "Ignite" />
- * <property name="dataSourceBean" value = "simpleDataSource" />
+ * <property name="dataSourceBean" value = "myDataSource" />
* </bean>
* </property>
* </bean>
@@ -99,7 +99,7 @@ public class CacheJdbcBlobStoreFactory<K, V> implements Factory<CacheJdbcBlobSto
/** Application context. */
@SpringApplicationContextResource
- private Object appContext;
+ private Object appCtx;
/** {@inheritDoc} */
@Override public CacheJdbcBlobStore<K, V> create() {
@@ -118,7 +118,7 @@ public class CacheJdbcBlobStoreFactory<K, V> implements Factory<CacheJdbcBlobSto
if (dataSrc != null)
store.setDataSource(dataSrc);
else if (dataSrcBean != null) {
- if (appContext == null)
+ if (appCtx == null)
throw new IgniteException("Spring application context resource is not injected.");
IgniteSpringHelper spring;
@@ -126,13 +126,13 @@ public class CacheJdbcBlobStoreFactory<K, V> implements Factory<CacheJdbcBlobSto
try {
spring = IgniteComponentType.SPRING.create(false);
- DataSource data = spring.loadBeanFromAppContext(appContext, dataSrcBean);
+ DataSource data = spring.loadBeanFromAppContext(appCtx, dataSrcBean);
store.setDataSource(data);
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to load bean in application context [beanName=" + dataSrcBean +
- ", igniteConfig=" + appContext + ']');
+ ", igniteConfig=" + appCtx + ']');
}
}
@@ -287,4 +287,4 @@ public class CacheJdbcBlobStoreFactory<K, V> implements Factory<CacheJdbcBlobSto
@Override public String toString() {
return S.toString(CacheJdbcBlobStoreFactory.class, this);
}
-}
\ No newline at end of file
+}