You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/10/26 11:47:53 UTC
[1/2] ignite git commit: IGNITE-1753 Reworking JDBC POJO store to new
configuration. Merging POJO and Portable stores in single store. Fixed
several tests.
Repository: ignite
Updated Branches:
refs/heads/ignite-1753-1282 [created] 8c1a71b28
http://git-wip-us.apache.org/repos/asf/ignite/blob/8c1a71b2/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreConfiguration.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreConfiguration.java
new file mode 100644
index 0000000..b333bc7
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreConfiguration.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.jdbc;
+
+import org.apache.ignite.cache.store.jdbc.dialect.*;
+
+import java.io.*;
+
+/**
+ * JDBC POJO store configuration.
+ */
+public class CacheJdbcPojoStoreConfiguration implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Default value for write attempts. */
+ public static final int DFLT_WRITE_ATTEMPTS = 2;
+
+ /** Default batch size for put and remove operations. */
+ public static final int DFLT_BATCH_SIZE = 512;
+
+ /** Default batch size for put and remove operations. */
+ public static final int DFLT_PARALLEL_LOAD_CACHE_MINIMUM_THRESHOLD = 512;
+
+ /** Maximum batch size for writeAll and deleteAll operations. */
+ private int batchSz = DFLT_BATCH_SIZE;
+
+ /** Name of data source bean. */
+ private String dataSrcBean;
+
+ /** Database dialect. */
+ private JdbcDialect dialect;
+
+ /** Max workers thread count. These threads are responsible for load cache. */
+ private int maxPoolSz = Runtime.getRuntime().availableProcessors();
+
+ /** Maximum write attempts in case of database error. */
+ private int maxWrtAttempts = DFLT_WRITE_ATTEMPTS;
+
+ /** Parallel load cache minimum threshold. If {@code 0} then load sequentially. */
+ private int parallelLoadCacheMinThreshold = DFLT_PARALLEL_LOAD_CACHE_MINIMUM_THRESHOLD;
+
+ /** Types that store could process. */
+ private CacheJdbcPojoStoreType[] types;
+
+ /**
+ * Empty constructor (all values are initialized to their defaults).
+ */
+ public CacheJdbcPojoStoreConfiguration() {
+ /* No-op. */
+ }
+
+ /**
+ * Copy constructor.
+ *
+ * @param cfg Configuration to copy.
+ */
+ public CacheJdbcPojoStoreConfiguration(CacheJdbcPojoStoreConfiguration cfg) {
+ // Order alphabetically for maintenance purposes.
+ batchSz = cfg.getBatchSize();
+ dataSrcBean = cfg.getDataSourceBean();
+ dialect = cfg.getDialect();
+ maxPoolSz = cfg.getMaximumPoolSize();
+ maxWrtAttempts = cfg.getMaximumWriteAttempts();
+ parallelLoadCacheMinThreshold = cfg.getParallelLoadCacheMinimumThreshold();
+ types = cfg.getTypes();
+ }
+
+ /**
+ * Get maximum batch size for delete and delete operations.
+ *
+ * @return Maximum batch size.
+ */
+ public int getBatchSize() {
+ return batchSz;
+ }
+
+ /**
+ * Set maximum batch size for write and delete operations.
+ *
+ * @param batchSz Maximum batch size.
+ * @return {@code This} for chaining.
+ */
+ public CacheJdbcPojoStoreConfiguration setBatchSize(int batchSz) {
+ this.batchSz = batchSz;
+
+ return this;
+ }
+
+ /**
+ * Gets name of the data source bean.
+ *
+ * @return Data source bean name.
+ */
+ public String getDataSourceBean() {
+ return dataSrcBean;
+ }
+
+ /**
+ * Sets name of the data source bean.
+ *
+ * @param dataSrcBean Data source bean name.
+ * @return {@code This} for chaining.
+ */
+ public CacheJdbcPojoStoreConfiguration setDataSourceBean(String dataSrcBean) {
+ this.dataSrcBean = dataSrcBean;
+
+ return this;
+ }
+
+ /**
+ * Get database dialect.
+ *
+ * @return Database dialect.
+ */
+ public JdbcDialect getDialect() {
+ return dialect;
+ }
+
+ /**
+ * Set database dialect.
+ *
+ * @param dialect Database dialect.
+ * @return {@code This} for chaining.
+ */
+ public CacheJdbcPojoStoreConfiguration setDialect(JdbcDialect dialect) {
+ this.dialect = dialect;
+
+ return this;
+ }
+
+ /**
+ * Get maximum workers thread count. These threads are responsible for queries execution.
+ *
+ * @return Maximum workers thread count.
+ */
+ public int getMaximumPoolSize() {
+ return maxPoolSz;
+ }
+
+ /**
+ * Set Maximum workers thread count. These threads are responsible for queries execution.
+ *
+ * @param maxPoolSz Max workers thread count.
+ * @return {@code This} for chaining.
+ */
+ public CacheJdbcPojoStoreConfiguration setMaximumPoolSize(int maxPoolSz) {
+ this.maxPoolSz = maxPoolSz;
+
+ return this;
+ }
+
+ /**
+ * Gets maximum number of write attempts in case of database error.
+ *
+ * @return Maximum number of write attempts.
+ */
+ public int getMaximumWriteAttempts() {
+ return maxWrtAttempts;
+ }
+
+ /**
+ * Sets maximum number of write attempts in case of database error.
+ *
+ * @param maxWrtAttempts Number of write attempts.
+ * @return {@code This} for chaining.
+ */
+ public CacheJdbcPojoStoreConfiguration setMaximumWriteAttempts(int maxWrtAttempts) {
+ this.maxWrtAttempts = maxWrtAttempts;
+
+ return this;
+ }
+
+ /**
+ * Parallel load cache minimum row count threshold.
+ *
+ * @return If {@code 0} then load sequentially.
+ */
+ public int getParallelLoadCacheMinimumThreshold() {
+ return parallelLoadCacheMinThreshold;
+ }
+
+ /**
+ * Parallel load cache minimum row count threshold.
+ *
+ * @param parallelLoadCacheMinThreshold Minimum row count threshold. If {@code 0} then load sequentially.
+ * @return {@code This} for chaining.
+ */
+ public CacheJdbcPojoStoreConfiguration setParallelLoadCacheMinimumThreshold(int parallelLoadCacheMinThreshold) {
+ this.parallelLoadCacheMinThreshold = parallelLoadCacheMinThreshold;
+
+ return this;
+ }
+
+ /**
+ * Gets types known by store.
+ *
+ * @return Types known by store.
+ */
+ public CacheJdbcPojoStoreType[] getTypes() {
+ return types;
+ }
+
+ /**
+ * Sets store configurations.
+ *
+ * @param types Store should process.
+ * @return {@code This} for chaining.
+ */
+ public CacheJdbcPojoStoreConfiguration setTypes(CacheJdbcPojoStoreType... types) {
+ this.types = types;
+
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8c1a71b2/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactory.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactory.java
index c90a69b..6d8f8af 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactory.java
@@ -61,43 +61,65 @@ public class CacheJdbcPojoStoreFactory<K, V> implements Factory<CacheJdbcPojoSto
/** */
private static final long serialVersionUID = 0L;
+ /** POJO store configuration. */
+ private CacheJdbcPojoStoreConfiguration cfg;
+
/** Name of data source bean. */
+ @Deprecated
private String dataSrcBean;
- /** Data source. */
- private transient DataSource dataSrc;
-
/** Database dialect. */
+ @Deprecated
private JdbcDialect dialect;
+ /** Data source. */
+ private transient DataSource dataSrc;
+
/** Application context. */
@SpringApplicationContextResource
- private transient Object appContext;
+ private transient Object appCtx;
/** {@inheritDoc} */
@Override public CacheJdbcPojoStore<K, V> create() {
CacheJdbcPojoStore<K, V> store = new CacheJdbcPojoStore<>();
- store.setDialect(dialect);
+ // For backward compatibility create store configuration.
+ if (cfg == null) {
+ cfg = new CacheJdbcPojoStoreConfiguration();
+
+ cfg.setDataSourceBean(dataSrcBean);
+ cfg.setDialect(dialect);
+ }
+
+ store.setBatchSize(cfg.getBatchSize());
+ store.setDialect(cfg.getDialect());
+ store.setMaximumPoolSize(cfg.getMaximumPoolSize());
+ store.setMaximumWriteAttempts(cfg.getMaximumWriteAttempts());
+ store.setParallelLoadCacheMinimumThreshold(cfg.getParallelLoadCacheMinimumThreshold());
+ store.setTypes(cfg.getTypes());
if (dataSrc != null)
store.setDataSource(dataSrc);
- else if (dataSrcBean != null) {
- if (appContext == null)
- throw new IgniteException("Spring application context resource is not injected.");
+ else {
+ String dtSrcBean = cfg.getDataSourceBean();
- IgniteSpringHelper spring;
+ if (dtSrcBean != null) {
+ if (appCtx == null)
+ throw new IgniteException("Spring application context resource is not injected.");
- try {
- spring = IgniteComponentType.SPRING.create(false);
+ IgniteSpringHelper spring;
- DataSource data = spring.loadBeanFromAppContext(appContext, dataSrcBean);
+ try {
+ spring = IgniteComponentType.SPRING.create(false);
- store.setDataSource(data);
- }
- catch (Exception e) {
- throw new IgniteException("Failed to load bean in application context [beanName=" + dataSrcBean +
- ", igniteConfig=" + appContext + ']', e);
+ DataSource data = spring.loadBeanFromAppContext(appCtx, dtSrcBean);
+
+ store.setDataSource(data);
+ }
+ catch (Exception e) {
+ throw new IgniteException("Failed to load bean in application context [beanName=" + dtSrcBean +
+ ", igniteConfig=" + appCtx + ']', e);
+ }
}
}
@@ -105,27 +127,27 @@ public class CacheJdbcPojoStoreFactory<K, V> implements Factory<CacheJdbcPojoSto
}
/**
- * Sets name of the data source bean.
+ * Sets store configuration.
*
- * @param dataSrcBean Data source bean name.
+ * @param cfg Configuration to use.
* @return {@code This} for chaining.
- * @see CacheJdbcPojoStore#setDataSource(DataSource)
*/
- public CacheJdbcPojoStoreFactory<K, V> setDataSourceBean(String dataSrcBean) {
- this.dataSrcBean = dataSrcBean;
+ public CacheJdbcPojoStoreFactory<K, V> setConfiguration(CacheJdbcPojoStoreConfiguration cfg) {
+ this.cfg = cfg;
return this;
}
/**
- * Sets data source. Data source should be fully configured and ready-to-use.
+ * Sets name of the data source bean.
*
- * @param dataSrc Data source.
+ * @param dataSrcBean Data source bean name.
* @return {@code This} for chaining.
* @see CacheJdbcPojoStore#setDataSource(DataSource)
*/
- public CacheJdbcPojoStoreFactory<K, V> setDataSource(DataSource dataSrc) {
- this.dataSrc = dataSrc;
+ @Deprecated
+ public CacheJdbcPojoStoreFactory<K, V> setDataSourceBean(String dataSrcBean) {
+ this.dataSrcBean = dataSrcBean;
return this;
}
@@ -136,12 +158,26 @@ public class CacheJdbcPojoStoreFactory<K, V> implements Factory<CacheJdbcPojoSto
* @param dialect Database dialect.
* @see CacheJdbcPojoStore#setDialect(JdbcDialect)
*/
+ @Deprecated
public void setDialect(JdbcDialect dialect) {
this.dialect = dialect;
}
+ /**
+ * Sets data source. Data source should be fully configured and ready-to-use.
+ *
+ * @param dataSrc Data source.
+ * @return {@code This} for chaining.
+ * @see CacheJdbcPojoStore#setDataSource(DataSource)
+ */
+ public CacheJdbcPojoStoreFactory<K, V> setDataSource(DataSource dataSrc) {
+ this.dataSrc = dataSrc;
+
+ return this;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(CacheJdbcPojoStoreFactory.class, this);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8c1a71b2/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreType.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreType.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreType.java
new file mode 100644
index 0000000..e755165
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreType.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.jdbc;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+
+/**
+ * Description for type that could be stored into database by store.
+ */
+public class CacheJdbcPojoStoreType implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Cache name. */
+ private String cacheName;
+
+ /** Schema name in database. */
+ private String dbSchema;
+
+ /** Table name in database. */
+ private String dbTbl;
+
+ /** Key class used to store key in cache. */
+ private String keyType;
+
+ /** List of fields descriptors for key object. */
+ @GridToStringInclude
+ private CacheJdbcPojoStoreTypeField[] keyFields;
+
+ /** Value class used to store value in cache. */
+ private String valType;
+
+ /** List of fields descriptors for value object. */
+ @GridToStringInclude
+ private CacheJdbcPojoStoreTypeField[] valFields;
+
+ /** If {@code true} object is stored as IgniteObject. */
+ private boolean keepSerialized;
+
+ /**
+ * Empty constructor (all values are initialized to their defaults).
+ */
+ public CacheJdbcPojoStoreType() {
+ /* No-op. */
+ }
+
+ /**
+ * Copy constructor.
+ *
+ * @param type Type to copy.
+ */
+ public CacheJdbcPojoStoreType(CacheJdbcPojoStoreType type) {
+ cacheName = type.getCacheName();
+
+ dbSchema = type.getDatabaseSchema();
+ dbTbl = type.getDatabaseTable();
+
+ keyType = type.getKeyType();
+ keyFields = type.getKeyFields();
+
+ valType = type.getValueType();
+ valFields = type.getValueFields();
+
+ keepSerialized = type.isKeepSerialized();
+ }
+
+ /**
+ * Gets associated cache name.
+ *
+ * @return Сache name.
+ */
+ public String getCacheName() {
+ return cacheName;
+ }
+
+ /**
+ * Sets associated cache name.
+ *
+ * @param cacheName Cache name.
+ */
+ public CacheJdbcPojoStoreType setCacheName(String cacheName) {
+ this.cacheName = cacheName;
+
+ return this;
+ }
+
+ /**
+ * Gets database schema name.
+ *
+ * @return Schema name.
+ */
+ public String getDatabaseSchema() {
+ return dbSchema;
+ }
+
+ /**
+ * Sets database schema name.
+ *
+ * @param dbSchema Schema name.
+ */
+ public CacheJdbcPojoStoreType setDatabaseSchema(String dbSchema) {
+ this.dbSchema = dbSchema;
+
+ return this;
+ }
+
+ /**
+ * Gets table name in database.
+ *
+ * @return Table name in database.
+ */
+ public String getDatabaseTable() {
+ return dbTbl;
+ }
+
+ /**
+ * Table name in database.
+ *
+ * @param dbTbl Table name in database.
+ * @return {@code this} for chaining.
+ */
+ public CacheJdbcPojoStoreType setDatabaseTable(String dbTbl) {
+ this.dbTbl = dbTbl;
+
+ return this;
+ }
+
+ /**
+ * Gets key type.
+ *
+ * @return Key type.
+ */
+ public String getKeyType() {
+ return keyType;
+ }
+
+ /**
+ * Sets key type.
+ *
+ * @param keyType Key type.
+ * @return {@code this} for chaining.
+ */
+ public CacheJdbcPojoStoreType setKeyType(String keyType) {
+ this.keyType = keyType;
+
+ return this;
+ }
+
+ /**
+ * Sets key type.
+ *
+ * @param cls Key type class.
+ * @return {@code this} for chaining.
+ */
+ public CacheJdbcPojoStoreType setKeyType(Class<?> cls) {
+ setKeyType(cls.getName());
+
+ return this;
+ }
+
+ /**
+ * Gets value type.
+ *
+ * @return Key type.
+ */
+ public String getValueType() {
+ return valType;
+ }
+
+ /**
+ * Sets value type.
+ *
+ * @param valType Value type.
+ * @return {@code this} for chaining.
+ */
+ public CacheJdbcPojoStoreType setValueType(String valType) {
+ this.valType = valType;
+
+ return this;
+ }
+
+ /**
+ * Sets value type.
+ *
+ * @param cls Value type class.
+ * @return {@code this} for chaining.
+ */
+ public CacheJdbcPojoStoreType setValueType(Class<?> cls) {
+ setValueType(cls.getName());
+
+ return this;
+ }
+
+ /**
+ * Gets optional persistent key fields (needed only if {@link CacheJdbcPojoStore} is used).
+ *
+ * @return Persistent key fields.
+ */
+ public CacheJdbcPojoStoreTypeField[] getKeyFields() {
+ return keyFields;
+ }
+
+ /**
+ * Sets optional persistent key fields (needed only if {@link CacheJdbcPojoStore} is used).
+ *
+ * @param keyFields Persistent key fields.
+ * @return {@code this} for chaining.
+ */
+ public CacheJdbcPojoStoreType setKeyFields(CacheJdbcPojoStoreTypeField... keyFields) {
+ this.keyFields = keyFields;
+
+ return this;
+ }
+
+ /**
+ * Gets optional persistent value fields (needed only if {@link CacheJdbcPojoStore} is used).
+ *
+ * @return Persistent value fields.
+ */
+ public CacheJdbcPojoStoreTypeField[] getValueFields() {
+ return valFields;
+ }
+
+ /**
+ * Sets optional persistent value fields (needed only if {@link CacheJdbcPojoStore} is used).
+ *
+ * @param valFields Persistent value fields.
+ * @return {@code this} for chaining.
+ */
+ public CacheJdbcPojoStoreType setValueFields(CacheJdbcPojoStoreTypeField... valFields) {
+ this.valFields = valFields;
+
+ return this;
+ }
+
+ /**
+ * Gets how value stored in cache.
+ *
+ * @return {@code true} if object is stored as IgniteObject.
+ */
+ public boolean isKeepSerialized() {
+ return keepSerialized;
+ }
+
+ /**
+ * Sets how value stored in cache.
+ *
+ * @param keepSerialized {@code true} if object is stored as IgniteObject.
+ * @return {@code this} for chaining.
+ */
+ public CacheJdbcPojoStoreType setKeepSerialized(boolean keepSerialized) {
+ this.keepSerialized = keepSerialized;
+
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8c1a71b2/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTypeField.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTypeField.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTypeField.java
new file mode 100644
index 0000000..46a2647
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTypeField.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.jdbc;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Description of how field declared in database and in cache.
+ */
+public class CacheJdbcPojoStoreTypeField implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Field JDBC type in database. */
+ private int dbFieldType;
+
+ /** Field name in database. */
+ private String dbFieldName;
+
+ /** Field java type. */
+ private Class<?> javaFieldType;
+
+ /** Field name in java object. */
+ private String javaFieldName;
+
+ /**
+ * Default constructor.
+ */
+ public CacheJdbcPojoStoreTypeField() {
+ // No-op.
+ }
+
+ /**
+ * Full constructor.
+ *
+ * @param dbFieldType Field JDBC type in database.
+ * @param dbFieldName Field name in database.
+ * @param javaFieldType Field java type.
+ * @param javaFieldName Field name in java object.
+ */
+ public CacheJdbcPojoStoreTypeField(int dbFieldType, String dbFieldName, Class<?> javaFieldType, String javaFieldName) {
+ this.dbFieldType = dbFieldType;
+ this.dbFieldName = dbFieldName;
+ this.javaFieldType = javaFieldType;
+ this.javaFieldName = javaFieldName;
+ }
+
+ /**
+ * Copy constructor.
+ *
+ * @param field Field to copy.
+ */
+ public CacheJdbcPojoStoreTypeField(CacheJdbcPojoStoreTypeField field) {
+ this(field.getDatabaseFieldType(), field.getDatabaseFieldName(),
+ field.getJavaFieldType(), field.getJavaFieldName());
+ }
+
+ /**
+ * @return Column JDBC type in database.
+ */
+ public int getDatabaseFieldType() {
+ return dbFieldType;
+ }
+
+ /**
+ * @param dbType Column JDBC type in database.
+ */
+ public void setDatabaseFieldType(int dbType) {
+ this.dbFieldType = dbType;
+ }
+
+
+ /**
+ * @return Column name in database.
+ */
+ public String getDatabaseFieldName() {
+ return dbFieldName;
+ }
+
+ /**
+ * @param dbName Column name in database.
+ */
+ public void setDatabaseFieldName(String dbName) {
+ this.dbFieldName = dbName;
+ }
+
+ /**
+ * @return Field java type.
+ */
+ public Class<?> getJavaFieldType() {
+ return javaFieldType;
+ }
+
+ /**
+ * @param javaType Corresponding java type.
+ */
+ public void setJavaFieldType(Class<?> javaType) {
+ this.javaFieldType = javaType;
+ }
+
+ /**
+ * @return Field name in java object.
+ */
+ public String getJavaFieldName() {
+ return javaFieldName;
+ }
+
+ /**
+ * @param javaName Field name in java object.
+ */
+ public void setJavaFieldName(String javaName) {
+ this.javaFieldName = javaName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (!(o instanceof CacheJdbcPojoStoreTypeField))
+ return false;
+
+ CacheJdbcPojoStoreTypeField that = (CacheJdbcPojoStoreTypeField)o;
+
+ return dbFieldType == that.dbFieldType && dbFieldName.equals(that.dbFieldName) &&
+ javaFieldType == that.javaFieldType && javaFieldName.equals(that.javaFieldName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = dbFieldType;
+ res = 31 * res + dbFieldName.hashCode();
+
+ res = 31 * res + javaFieldType.hashCode();
+ res = 31 * res + javaFieldName.hashCode();
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CacheJdbcPojoStoreTypeField.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8c1a71b2/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java
index 0ad2cad..b2d871c 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java
@@ -17,23 +17,19 @@
package org.apache.ignite.cache.store.jdbc;
-import java.net.MalformedURLException;
-import java.net.URL;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
+import java.sql.Types;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.cache.integration.CacheWriterException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.cache.CacheTypeMetadata;
-import org.apache.ignite.cache.store.jdbc.dialect.BasicJdbcDialect;
-import org.apache.ignite.cache.store.jdbc.dialect.JdbcDialect;
+
+import org.apache.ignite.cache.store.jdbc.dialect.H2Dialect;
import org.apache.ignite.cache.store.jdbc.model.Organization;
import org.apache.ignite.cache.store.jdbc.model.OrganizationKey;
import org.apache.ignite.cache.store.jdbc.model.Person;
@@ -41,16 +37,11 @@ import org.apache.ignite.cache.store.jdbc.model.PersonComplexKey;
import org.apache.ignite.cache.store.jdbc.model.PersonKey;
import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
import org.apache.ignite.internal.util.typedef.CI2;
-import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.cache.GridAbstractCacheStoreSelfTest;
import org.h2.jdbcx.JdbcConnectionPool;
-import org.springframework.beans.BeansException;
-import org.springframework.beans.factory.xml.XmlBeanDefinitionReader;
-import org.springframework.context.support.GenericApplicationContext;
-import org.springframework.core.io.UrlResource;
/**
* Class for {@code PojoCacheStore} tests.
@@ -59,9 +50,6 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache
/** DB connection URL. */
private static final String DFLT_CONN_URL = "jdbc:h2:mem:autoCacheStore;DB_CLOSE_DELAY=-1";
- /** Default config with mapping. */
- private static final String DFLT_MAPPING_CONFIG = "modules/core/src/test/config/store/jdbc/ignite-type-metadata.xml";
-
/** Organization count. */
protected static final int ORGANIZATION_CNT = 1000;
@@ -77,71 +65,89 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache
/** {@inheritDoc} */
@Override protected CacheJdbcPojoStore<Object, Object> store() {
- CacheJdbcPojoStore<Object, Object> store = new CacheJdbcPojoStore<>();
-
-// PGPoolingDataSource ds = new PGPoolingDataSource();
-// ds.setUser("postgres");
-// ds.setPassword("postgres");
-// ds.setServerName("ip");
-// ds.setDatabaseName("postgres");
-// store.setDataSource(ds);
-
-// MysqlDataSource ds = new MysqlDataSource();
-// ds.setURL("jdbc:mysql://ip:port/dbname");
-// ds.setUser("mysql");
-// ds.setPassword("mysql");
-
+ CacheJdbcPojoStoreFactory<Object, Object> storeFactory = new CacheJdbcPojoStoreFactory<>();
+
+ CacheJdbcPojoStoreConfiguration storeCfg = new CacheJdbcPojoStoreConfiguration();
+ storeCfg.setDialect(new H2Dialect());
+
+ CacheJdbcPojoStoreType[] storeTypes = new CacheJdbcPojoStoreType[6];
+
+ storeTypes[0] = new CacheJdbcPojoStoreType();
+ storeTypes[0].setDatabaseSchema("PUBLIC");
+ storeTypes[0].setDatabaseTable("ORGANIZATION");
+ storeTypes[0].setKeyType("org.apache.ignite.cache.store.jdbc.model.OrganizationKey");
+ storeTypes[0].setKeyFields(new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ID", Integer.class, "id"));
+
+ storeTypes[0].setValueType("org.apache.ignite.cache.store.jdbc.model.Organization");
+ storeTypes[0].setValueFields(
+ new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ID", Integer.class, "id"),
+ new CacheJdbcPojoStoreTypeField(Types.VARCHAR, "NAME", String.class, "name"),
+ new CacheJdbcPojoStoreTypeField(Types.VARCHAR, "CITY", String.class, "city"));
+
+ storeTypes[1] = new CacheJdbcPojoStoreType();
+ storeTypes[1].setDatabaseSchema("PUBLIC");
+ storeTypes[1].setDatabaseTable("PERSON");
+ storeTypes[1].setKeyType("org.apache.ignite.cache.store.jdbc.model.PersonKey");
+ storeTypes[1].setKeyFields(new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ID", Integer.class, "id"));
+
+ storeTypes[1].setValueType("org.apache.ignite.cache.store.jdbc.model.Person");
+ storeTypes[1].setValueFields(
+ new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ID", Integer.class, "id"),
+ new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ORG_ID", Integer.class, "orgId"),
+ new CacheJdbcPojoStoreTypeField(Types.VARCHAR, "NAME", String.class, "name"));
+
+ storeTypes[2] = new CacheJdbcPojoStoreType();
+ storeTypes[2].setDatabaseSchema("PUBLIC");
+ storeTypes[2].setDatabaseTable("PERSON_COMPLEX");
+ storeTypes[2].setKeyType("org.apache.ignite.cache.store.jdbc.model.PersonComplexKey");
+ storeTypes[2].setKeyFields(
+ new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ID", int.class, "id"),
+ new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ORG_ID", int.class, "orgId"),
+ new CacheJdbcPojoStoreTypeField(Types.INTEGER, "CITY_ID", int.class, "cityId"));
+
+ storeTypes[2].setValueType("org.apache.ignite.cache.store.jdbc.model.Person");
+ storeTypes[2].setValueFields(
+ new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ID", Integer.class, "id"),
+ new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ORG_ID", Integer.class, "orgId"),
+ new CacheJdbcPojoStoreTypeField(Types.VARCHAR, "NAME", String.class, "name"),
+ new CacheJdbcPojoStoreTypeField(Types.VARCHAR, "SALARY", Integer.class, "salary"));
+
+ storeTypes[3] = new CacheJdbcPojoStoreType();
+ storeTypes[3].setDatabaseSchema("PUBLIC");
+ storeTypes[3].setDatabaseTable("TIMESTAMP_ENTRIES");
+ storeTypes[3].setKeyType("java.sql.Timestamp");
+ storeTypes[3].setKeyFields(new CacheJdbcPojoStoreTypeField(Types.TIMESTAMP, "KEY", Timestamp.class, null));
+
+ storeTypes[3].setValueType("java.lang.Integer");
+ storeTypes[3].setValueFields(new CacheJdbcPojoStoreTypeField(Types.INTEGER, "VAL", Integer.class, null));
+
+ storeTypes[4] = new CacheJdbcPojoStoreType();
+ storeTypes[4].setDatabaseSchema("PUBLIC");
+ storeTypes[4].setDatabaseTable("STRING_ENTRIES");
+ storeTypes[4].setKeyType("java.lang.String");
+ storeTypes[4].setKeyFields(new CacheJdbcPojoStoreTypeField(Types.VARCHAR, "KEY", String.class, null));
+
+ storeTypes[4].setValueType("java.lang.String");
+ storeTypes[4].setValueFields(new CacheJdbcPojoStoreTypeField(Types.VARCHAR, "VAL", Integer.class, null));
+
+ storeTypes[5] = new CacheJdbcPojoStoreType();
+ storeTypes[5].setDatabaseSchema("PUBLIC");
+ storeTypes[5].setDatabaseTable("UUID_ENTRIES");
+ storeTypes[5].setKeyType("java.util.UUID");
+ storeTypes[5].setKeyFields(new CacheJdbcPojoStoreTypeField(Types.BINARY, "KEY", UUID.class, null));
+
+ storeTypes[5].setValueType("java.util.UUID");
+ storeTypes[5].setValueFields(new CacheJdbcPojoStoreTypeField(Types.BINARY, "VAL", UUID.class, null));
+
+ storeCfg.setTypes(storeTypes);
+
+ storeFactory.setConfiguration(storeCfg);
+
+ CacheJdbcPojoStore<Object, Object> store = storeFactory.create();
+
+ // H2 DataSource
store.setDataSource(JdbcConnectionPool.create(DFLT_CONN_URL, "sa", ""));
- URL cfgUrl;
-
- try {
- cfgUrl = new URL(DFLT_MAPPING_CONFIG);
- }
- catch (MalformedURLException ignore) {
- cfgUrl = U.resolveIgniteUrl(DFLT_MAPPING_CONFIG);
- }
-
- if (cfgUrl == null)
- throw new IgniteException("Failed to resolve metadata path: " + DFLT_MAPPING_CONFIG);
-
- try {
- GenericApplicationContext springCtx = new GenericApplicationContext();
-
- new XmlBeanDefinitionReader(springCtx).loadBeanDefinitions(new UrlResource(cfgUrl));
-
- springCtx.refresh();
-
- Collection<CacheTypeMetadata> typeMeta = springCtx.getBeansOfType(CacheTypeMetadata.class).values();
-
- Map<Integer, Map<Object, CacheAbstractJdbcStore.EntryMapping>> cacheMappings = new HashMap<>();
-
- JdbcDialect dialect = store.resolveDialect();
-
- GridTestUtils.setFieldValue(store, CacheAbstractJdbcStore.class, "dialect", dialect);
-
- Map<Object, CacheAbstractJdbcStore.EntryMapping> entryMappings = U.newHashMap(typeMeta.size());
-
- for (CacheTypeMetadata type : typeMeta)
- entryMappings.put(store.keyTypeId(type.getKeyType()),
- new CacheAbstractJdbcStore.EntryMapping(null, dialect, type));
-
- store.prepareBuilders(null, typeMeta);
-
- cacheMappings.put(null, entryMappings);
-
- GridTestUtils.setFieldValue(store, CacheAbstractJdbcStore.class, "cacheMappings", cacheMappings);
- }
- catch (BeansException e) {
- if (X.hasCause(e, ClassNotFoundException.class))
- throw new IgniteException("Failed to instantiate Spring XML application context " +
- "(make sure all classes used in Spring configuration are present at CLASSPATH) " +
- "[springUrl=" + cfgUrl + ']', e);
- else
- throw new IgniteException("Failed to instantiate Spring XML application context [springUrl=" +
- cfgUrl + ", err=" + e.getMessage() + ']', e);
- }
-
return store;
}
@@ -152,7 +158,7 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache
@Override protected void inject(CacheJdbcPojoStore<Object, Object> store) throws Exception {
getTestResources().inject(store);
- GridTestUtils.setFieldValue(store, CacheAbstractJdbcStore.class, "ses", ses);
+ GridTestUtils.setFieldValue(store, CacheJdbcPojoStore.class, "ses", ses);
}
/** {@inheritDoc} */
@@ -224,7 +230,6 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache
super.beforeTest();
}
-
/**
* @throws Exception If failed.
*/
@@ -274,7 +279,7 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache
if (i > 0)
prnComplexStmt.setInt(5, 1000 + i * 500);
else // Add person with null salary
- prnComplexStmt.setNull(5, java.sql.Types.INTEGER);
+ prnComplexStmt.setNull(5, Types.INTEGER);
prnComplexStmt.addBatch();
}
@@ -302,9 +307,9 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache
Person val = (Person)v;
- assert key.getId() == val.getId();
- assert key.getOrgId() == val.getOrgId();
- assertEquals("name" + key.getId(), val.getName());
+ assertTrue("Key ID should be the same as value ID", key.getId() == val.getId());
+ assertTrue("Key orgID should be the same as value orgID", key.getOrgId() == val.getOrgId());
+ assertEquals("name" + key.getId(), val.getName());
prnComplexKeys.add((PersonComplexKey)k);
}
@@ -351,25 +356,23 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache
* @throws Exception If failed.
*/
public void testWriteRetry() throws Exception {
+ CacheJdbcPojoStore<Object, Object> store = store();
+
// Special dialect that will skip updates, to test write retry.
- BasicJdbcDialect dialect = new BasicJdbcDialect() {
+ store.setDialect(new H2Dialect() {
/** {@inheritDoc} */
- @Override public String updateQuery(String tblName, Collection<String> keyCols, Iterable<String> valCols) {
- return super.updateQuery(tblName, keyCols, valCols) + " AND 1 = 0";
+ @Override public boolean hasMerge() {
+ return false;
}
- };
-
- store.setDialect(dialect);
-
- Map<String, Map<Object, CacheAbstractJdbcStore.EntryMapping>> cacheMappings =
- GridTestUtils.getFieldValue(store, CacheAbstractJdbcStore.class, "cacheMappings");
-
- CacheAbstractJdbcStore.EntryMapping em = cacheMappings.get(null).get(OrganizationKey.class);
- CacheTypeMetadata typeMeta = GridTestUtils.getFieldValue(em, CacheAbstractJdbcStore.EntryMapping.class, "typeMeta");
+ /** {@inheritDoc} */
+ @Override public String updateQuery(String tblName, Collection<String> keyCols,
+ Iterable<String> valCols) {
+ return super.updateQuery(tblName, keyCols, valCols) + " AND 1 = 0";
+ }
+ });
- cacheMappings.get(null).put(OrganizationKey.class,
- new CacheAbstractJdbcStore.EntryMapping(null, dialect, typeMeta));
+ inject(store);
Connection conn = store.openConnection(false);
@@ -392,6 +395,8 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache
try {
store.write(new CacheEntryImpl<>(k1, v1));
+
+ fail("CacheWriterException wasn't thrown.");
}
catch (CacheWriterException e) {
if (!e.getMessage().startsWith("Failed insert entry in database, violate a unique index or primary key") ||
@@ -418,4 +423,4 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache
assertNull(store.load(k));
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8c1a71b2/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractMultithreadedSelfTest.java
index 757cedd..42cc4c9 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractMultithreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractMultithreadedSelfTest.java
@@ -60,7 +60,7 @@ import static org.apache.ignite.testframework.GridTestUtils.runMultiThreadedAsyn
/**
*
*/
-public abstract class CacheJdbcStoreAbstractMultithreadedSelfTest<T extends CacheAbstractJdbcStore>
+public abstract class CacheJdbcStoreAbstractMultithreadedSelfTest<T extends CacheJdbcPojoStore>
extends GridCommonAbstractTest {
/** Default config with mapping. */
private static final String DFLT_MAPPING_CONFIG = "modules/core/src/test/config/store/jdbc/ignite-type-metadata.xml";
@@ -78,7 +78,7 @@ public abstract class CacheJdbcStoreAbstractMultithreadedSelfTest<T extends Cach
private static final int BATCH_CNT = 2000;
/** Cache store. */
- protected static CacheAbstractJdbcStore store;
+ protected static CacheJdbcPojoStore store;
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
@@ -308,4 +308,4 @@ public abstract class CacheJdbcStoreAbstractMultithreadedSelfTest<T extends Cach
}
}, 8, "tx");
}
-}
\ No newline at end of file
+}
[2/2] ignite git commit: IGNITE-1753 Reworking JDBC POJO store to new
configuration. Merging POJO and Portable stores in single store. Fixed
several tests.
Posted by ak...@apache.org.
IGNITE-1753 Reworking JDBC POJO store to new configuration. Merging POJO and Portable stores in single store. Fixed several tests.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8c1a71b2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8c1a71b2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8c1a71b2
Branch: refs/heads/ignite-1753-1282
Commit: 8c1a71b28f4ffc73457c1a58efc735bbc2ad2ce6
Parents: 9d67c20
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Mon Oct 26 17:47:43 2015 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Mon Oct 26 17:47:43 2015 +0700
----------------------------------------------------------------------
.../store/jdbc/CacheAbstractJdbcStore.java | 3 +-
.../cache/store/jdbc/CacheJdbcPojoStore.java | 2209 +++++++++++++++++-
.../jdbc/CacheJdbcPojoStoreConfiguration.java | 230 ++
.../store/jdbc/CacheJdbcPojoStoreFactory.java | 90 +-
.../store/jdbc/CacheJdbcPojoStoreType.java | 272 +++
.../store/jdbc/CacheJdbcPojoStoreTypeField.java | 160 ++
.../store/jdbc/CacheJdbcPojoStoreTest.java | 205 +-
...eJdbcStoreAbstractMultithreadedSelfTest.java | 6 +-
8 files changed, 2953 insertions(+), 222 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/8c1a71b2/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
index 6e27b9a..bd04fe7 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
@@ -113,6 +113,7 @@ import static java.sql.Statement.SUCCESS_NO_INFO;
* ...
* </pre>
*/
+@Deprecated
public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>, LifecycleAware {
/** Max attempt write count. */
protected static final int MAX_ATTEMPT_WRITE_COUNT = 2;
@@ -1821,4 +1822,4 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
}
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8c1a71b2/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
index d78ea48..6b3473a 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
@@ -19,36 +19,1814 @@ package org.apache.ignite.cache.store.jdbc;
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.sql.BatchUpdateException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.cache.Cache;
import javax.cache.CacheException;
import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import javax.sql.DataSource;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgnitePortables;
import org.apache.ignite.cache.CacheTypeFieldMetadata;
import org.apache.ignite.cache.CacheTypeMetadata;
import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreSession;
+import org.apache.ignite.cache.store.jdbc.dialect.BasicJdbcDialect;
+import org.apache.ignite.cache.store.jdbc.dialect.DB2Dialect;
+import org.apache.ignite.cache.store.jdbc.dialect.H2Dialect;
+import org.apache.ignite.cache.store.jdbc.dialect.JdbcDialect;
+import org.apache.ignite.cache.store.jdbc.dialect.MySQLDialect;
+import org.apache.ignite.cache.store.jdbc.dialect.OracleDialect;
+import org.apache.ignite.cache.store.jdbc.dialect.SQLServerDialect;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lifecycle.LifecycleAware;
+import org.apache.ignite.portable.PortableBuilder;
+import org.apache.ignite.portable.PortableObject;
+import org.apache.ignite.resources.CacheStoreSessionResource;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.transactions.Transaction;
import org.jetbrains.annotations.Nullable;
+import static java.sql.Statement.EXECUTE_FAILED;
+import static java.sql.Statement.SUCCESS_NO_INFO;
+import static org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreConfiguration.DFLT_BATCH_SIZE;
+import static org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreConfiguration.DFLT_WRITE_ATTEMPTS;
+import static org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreConfiguration.DFLT_PARALLEL_LOAD_CACHE_MINIMUM_THRESHOLD;
+
/**
* Implementation of {@link CacheStore} backed by JDBC and POJO via reflection.
*
- * This implementation stores objects in underlying database using java beans mapping description via reflection.
- * <p>
+ * This implementation stores objects in underlying database using java beans mapping description via reflection. <p>
* Use {@link CacheJdbcPojoStoreFactory} factory to pass {@link CacheJdbcPojoStore} to {@link CacheConfiguration}.
*/
-public class CacheJdbcPojoStore<K, V> extends CacheAbstractJdbcStore<K, V> {
+public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAware {
+ /** Connection attribute property name. */
+ private static final String ATTR_CONN_PROP = "JDBC_STORE_CONNECTION";
+
+ /** Auto-injected store session. */
+ @CacheStoreSessionResource
+ private CacheStoreSession ses;
+
+ /** Auto injected ignite instance. */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** Auto-injected logger instance. */
+ @LoggerResource
+ private IgniteLogger log;
+
+ /** Lock for metadata cache. */
+ @GridToStringExclude
+ private final Lock cacheMappingsLock = new ReentrantLock();
+
+ /** Data source. */
+ private DataSource dataSrc;
+
+ /** Cache with entry mapping description. (cache name, (key id, mapping description)). */
+ private volatile Map<String, Map<Object, EntryMapping>> cacheMappings = Collections.emptyMap();
+
+ /** Maximum batch size for writeAll and deleteAll operations. */
+ private int batchSz = DFLT_BATCH_SIZE;
+
+ /** Database dialect. */
+ private JdbcDialect dialect;
+
+ /** Max workers thread count. These threads are responsible for load cache. */
+ private int maxPoolSz = Runtime.getRuntime().availableProcessors();
+
+ /** Maximum write attempts in case of database error. */
+ private int maxWrtAttempts = DFLT_WRITE_ATTEMPTS;
+
+ /** Parallel load cache minimum threshold. If {@code 0} then load sequentially. */
+ private int parallelLoadCacheMinThreshold = DFLT_PARALLEL_LOAD_CACHE_MINIMUM_THRESHOLD;
+
+ /** Types that store could process. */
+ private CacheJdbcPojoStoreType[] types;
+
+ /** Map for quick check whether type is POJO or Portable. */
+ private volatile Map<String, Map<String, Boolean>> keepSerializedTypes = new HashMap<>();
+
+ /** POJO methods cache. */
+ private volatile Map<String, Map<String, PojoMethodsCache>> pojoMethods = Collections.emptyMap();
+
+ /** Portables builders cache. */
+ private volatile Map<String, Map<String, PortableBuilder>> portableBuilders = Collections.emptyMap();
+
+ /**
+ * Checks for POJO/portable format.
+ *
+ * @param cacheName Cache name to get types settings.
+ * @param typeName Type name to check for POJO/portable format.
+ * @return {@code true} If portable format configured.
+ * @throws CacheException In case of error.
+ */
+ private boolean isKeepSerialized(String cacheName, String typeName) {
+ Map<String, Boolean> cacheTypes = keepSerializedTypes.get(cacheName);
+
+ if (cacheTypes == null)
+ throw new CacheException("Failed to find types metadata for cache: " + cacheName);
+
+ Boolean keepSerialized = cacheTypes.get(typeName);
+
+ if (keepSerialized == null)
+ throw new CacheException("Failed to find type metadata for type: " + typeName);
+
+ return keepSerialized;
+ }
+
+ /**
+ * Get field value from object for use as query parameter.
+ *
+ * @param cacheName Cache name.
+ * @param typeName Type name.
+ * @param fieldName Field name.
+ * @param obj Cache object.
+ * @return Field value from object.
+ * @throws CacheException in case of error.
+ */
+ @Nullable private Object extractParameter(@Nullable String cacheName, String typeName, String fieldName,
+ Object obj) throws CacheException {
+ return isKeepSerialized(cacheName, typeName)
+ ? extractPortableParameter(fieldName, obj)
+ : extractPojoParameter(cacheName, typeName, fieldName, obj);
+ }
+
+ /**
+ * Get field value from POJO for use as query parameter.
+ *
+ * @param cacheName Cache name.
+ * @param typeName Type name.
+ * @param fieldName Field name.
+ * @param obj Cache object.
+ * @return Field value from object.
+ * @throws CacheException in case of error.
+ */
+ @Nullable private Object extractPojoParameter(@Nullable String cacheName, String typeName, String fieldName,
+ Object obj) throws CacheException {
+ try {
+ Map<String, PojoMethodsCache> cacheMethods = pojoMethods.get(cacheName);
+
+ if (cacheMethods == null)
+ throw new CacheException("Failed to find POJO type metadata for cache: " + cacheName);
+
+ PojoMethodsCache mc = cacheMethods.get(typeName);
+
+ if (mc == null)
+ throw new CacheException("Failed to find POJO type metadata for type: " + typeName);
+
+ if (mc.simple)
+ return obj;
+
+ Method getter = mc.getters.get(fieldName);
+
+ if (getter == null)
+ throw new CacheLoaderException("Failed to find getter in POJO class [clsName=" + typeName +
+ ", prop=" + fieldName + "]");
+
+ return getter.invoke(obj);
+ }
+ catch (Exception e) {
+ throw new CacheException("Failed to read object of class: " + typeName, e);
+ }
+ }
+
+ /**
+ * Get field value from Portable for use as query parameter.
+ *
+ * @param fieldName Field name to extract query parameter for.
+ * @param obj Object to process.
+ * @return Field value from object.
+ * @throws CacheException in case of error.
+ */
+ private Object extractPortableParameter(String fieldName, Object obj) throws CacheException {
+ if (obj instanceof PortableObject) {
+ PortableObject pobj = (PortableObject)obj;
+
+ return pobj.field(fieldName);
+ }
+
+ throw new CacheException("Failed to read property value from non portable object [class name=" +
+ obj.getClass() + ", property=" + fieldName + "]");
+ }
+
+ /**
+ * Construct object from query result.
+ *
+ * @param <R> Type of result object.
+ * @param cacheName Cache name.
+ * @param typeName Type name.
+ * @param fields Fields descriptors.
+ * @param loadColIdxs Select query columns index.
+ * @param rs ResultSet.
+ * @return Constructed object.
+ * @throws CacheLoaderException If failed to construct cache object.
+ */
+ private <R> R buildObject(@Nullable String cacheName, String typeName,
+ CacheJdbcPojoStoreTypeField[] fields, Map<String, Integer> loadColIdxs, ResultSet rs)
+ throws CacheLoaderException {
+ return (R)(isKeepSerialized(cacheName, typeName)
+ ? buildPortableObject(cacheName, typeName, fields, loadColIdxs, rs)
+ : buildPojoObject(cacheName, typeName, fields, loadColIdxs, rs));
+ }
+
+ /**
+ * Construct POJO from query result.
+ *
+ * @param cacheName Cache name.
+ * @param typeName Type name.
+ * @param fields Fields descriptors.
+ * @param loadColIdxs Select query columns index.
+ * @param rs ResultSet.
+ * @return Constructed POJO.
+ * @throws CacheLoaderException If failed to construct POJO.
+ */
+ private Object buildPojoObject(@Nullable String cacheName, String typeName,
+ CacheJdbcPojoStoreTypeField[] fields, Map<String, Integer> loadColIdxs, ResultSet rs)
+ throws CacheLoaderException {
+
+ Map<String, PojoMethodsCache> z = pojoMethods.get(cacheName);
+
+ if (z == null)
+ throw new CacheLoaderException("Failed to find POJO types metadata for cache: " + cacheName);
+
+ PojoMethodsCache mc = z.get(typeName);
+
+ if (mc == null)
+ throw new CacheLoaderException("Failed to find POJO type metadata for type: " + typeName);
+
+ try {
+ if (mc.simple) {
+ CacheJdbcPojoStoreTypeField field = fields[0];
+
+ return getColumnValue(rs, loadColIdxs.get(field.getDatabaseFieldName()), mc.cls);
+ }
+
+ Object obj = mc.ctor.newInstance();
+
+ for (CacheJdbcPojoStoreTypeField field : fields) {
+ String fldJavaName = field.getJavaFieldName();
+
+ Method setter = mc.setters.get(fldJavaName);
+
+ if (setter == null)
+ throw new IllegalStateException("Failed to find setter in POJO class [clsName=" + typeName +
+ ", prop=" + fldJavaName + "]");
+
+ String fldDbName = field.getDatabaseFieldName();
+
+ Integer colIdx = loadColIdxs.get(fldDbName);
+
+ try {
+ setter.invoke(obj, getColumnValue(rs, colIdx, field.getJavaFieldType()));
+ }
+ catch (Exception e) {
+ throw new IllegalStateException("Failed to set property in POJO class [clsName=" + typeName +
+ ", prop=" + fldJavaName + ", col=" + colIdx + ", dbName=" + fldDbName + "]", e);
+ }
+ }
+
+ return obj;
+ }
+ catch (SQLException e) {
+ throw new CacheLoaderException("Failed to read object of class: " + typeName, e);
+ }
+ catch (Exception e) {
+ throw new CacheLoaderException("Failed to construct instance of class: " + typeName, e);
+ }
+ }
+
+ /**
+ * Construct portable object from query result.
+ *
+ * @param cacheName Cache name.
+ * @param typeName Type name.
+ * @param fields Fields descriptors.
+ * @param loadColIdxs Select query columns index.
+ * @param rs ResultSet.
+ * @return Constructed portable object.
+ * @throws CacheLoaderException If failed to construct portable object.
+ */
+ protected PortableObject buildPortableObject(String cacheName, String typeName, CacheJdbcPojoStoreTypeField[] fields,
+ Map<String, Integer> loadColIdxs, ResultSet rs) throws CacheException {
+ Map<String, PortableBuilder> cachePortables = portableBuilders.get(cacheName);
+
+ if (cachePortables == null)
+ throw new CacheException("Failed to find portable builders for cache: " + cacheName);
+
+ PortableBuilder builder = cachePortables.get(typeName);
+
+ if (builder == null)
+ throw new CacheException("Failed to find portable builder for type: " + typeName);
+
+ try {
+ for (CacheJdbcPojoStoreTypeField field : fields) {
+ Class<?> type = field.getJavaFieldType();
+
+ Integer colIdx = loadColIdxs.get(field.getDatabaseFieldName());
+
+ builder.setField(field.getJavaFieldName(), getColumnValue(rs, colIdx, type));
+ }
+
+ return builder.build();
+ }
+ catch (SQLException e) {
+ throw new CacheException("Failed to read portable object", e);
+ }
+ }
+
+ /**
+ * Calculate type ID for object.
+ *
+ * @param obj Object to calculate type ID for.
+ * @return Type ID.
+ * @throws CacheException If failed to calculate type ID for given object.
+ */
+ private Object typeIdForObject(Object obj) throws CacheException {
+ if (obj instanceof PortableObject)
+ return ((PortableObject)obj).typeId();
+
+ return obj.getClass();
+ }
+
+ /**
+ * Calculate type ID for given type name.
+ *
+ * @param keepSerialized If {@code true} then calculate type ID for portable object otherwise for POJO.
+ * @param typeName String description of type name.
+ * @return Type ID.
+ * @throws CacheException If failed to get type ID for given type name.
+ */
+ private Object typeIdForTypeName(boolean keepSerialized, String typeName) throws CacheException {
+ if (keepSerialized)
+ return ignite().portables().typeId(typeName);
+
+ try {
+ return Class.forName(typeName);
+ }
+ catch (ClassNotFoundException e) {
+ throw new CacheException("Failed to find class: " + typeName, e);
+ }
+ }
+
+ /**
+ * Prepare builders for POJOs via reflection (getters and setters).
+ *
+ * @param cacheName Cache name to prepare builders for.
+ * @param types Collection of types.
+ * @throws CacheException If failed to prepare internal builders for types.
+ */
+ private void preparePojoBuilders(@Nullable String cacheName, Collection<CacheJdbcPojoStoreType> types)
+ throws CacheException {
+ Map<String, PojoMethodsCache> typeMethods = U.newHashMap(types.size() * 2);
+
+ for (CacheJdbcPojoStoreType type : types) {
+ if (!type.isKeepSerialized()) {
+ // TODO check for duplicates
+
+ String keyType = type.getKeyType();
+ typeMethods.put(keyType, new PojoMethodsCache(keyType, type.getKeyFields()));
+
+ String valType = type.getValueType();
+ typeMethods.put(valType, new PojoMethodsCache(valType, type.getValueFields()));
+ }
+ }
+
+ if (!typeMethods.isEmpty()) {
+ Map<String, Map<String, PojoMethodsCache>> newMtdsCache = new HashMap<>(pojoMethods);
+
+ newMtdsCache.put(cacheName, typeMethods);
+
+ pojoMethods = newMtdsCache;
+ }
+ }
+
+ /**
+ * Prepare builders for portable objects via portable builder.
+ *
+ * @param cacheName Cache name to prepare builders for.
+ * @param types Collection of types.
+ * @throws CacheException If failed to prepare internal builders for types.
+ */
+ private void preparePortableBuilders(@Nullable String cacheName, Collection<CacheJdbcPojoStoreType> types)
+ throws CacheException {
+ Map<String, PortableBuilder> typeBuilders = U.newHashMap(types.size() * 2);
+
+ for (CacheJdbcPojoStoreType type : types) {
+ if (type.isKeepSerialized()) {
+ Ignite ignite = ignite();
+
+ IgnitePortables portables = ignite.portables();
+
+ String keyType = type.getKeyType();
+ int keyTypeId = portables.typeId(keyType);
+ typeBuilders.put(keyType, portables.builder(keyTypeId));
+
+ String valType = type.getValueType();
+ int valTypeId = portables.typeId(valType);
+ typeBuilders.put(valType, portables.builder(valTypeId));
+ }
+ }
+
+ if (!typeBuilders.isEmpty()) {
+ Map<String, Map<String, PortableBuilder>> newBuilders = new HashMap<>(portableBuilders);
+
+ newBuilders.put(cacheName, typeBuilders);
+
+ portableBuilders = newBuilders;
+ }
+ }
+
+ /**
+ * Perform dialect resolution.
+ *
+ * @return The resolved dialect.
+ * @throws CacheException Indicates problems accessing the metadata.
+ */
+ private JdbcDialect resolveDialect() throws CacheException {
+ Connection conn = null;
+
+ String dbProductName = null;
+
+ try {
+ conn = openConnection(false);
+
+ dbProductName = conn.getMetaData().getDatabaseProductName();
+ }
+ catch (SQLException e) {
+ throw new CacheException("Failed access to metadata for detect database dialect.", e);
+ }
+ finally {
+ U.closeQuiet(conn);
+ }
+
+ if ("H2".equals(dbProductName))
+ return new H2Dialect();
+
+ if ("MySQL".equals(dbProductName))
+ return new MySQLDialect();
+
+ if (dbProductName.startsWith("Microsoft SQL Server"))
+ return new SQLServerDialect();
+
+ if ("Oracle".equals(dbProductName))
+ return new OracleDialect();
+
+ if (dbProductName.startsWith("DB2/"))
+ return new DB2Dialect();
+
+ U.warn(log, "Failed to resolve dialect (BasicJdbcDialect will be used): " + dbProductName);
+
+ return new BasicJdbcDialect();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteException {
+ if (dataSrc == null)
+ throw new IgniteException("Failed to initialize cache store (data source is not provided).");
+
+ if (dialect == null) {
+ dialect = resolveDialect();
+
+ if (log.isDebugEnabled() && dialect.getClass() != BasicJdbcDialect.class)
+ log.debug("Resolved database dialect: " + U.getSimpleName(dialect.getClass()));
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() throws IgniteException {
+ // No-op.
+ }
+
+ /**
+ * Gets connection from a pool.
+ *
+ * @param autocommit {@code true} If connection should use autocommit mode.
+ * @return Pooled connection.
+ * @throws SQLException In case of error.
+ */
+ protected Connection openConnection(boolean autocommit) throws SQLException {
+ Connection conn = dataSrc.getConnection();
+
+ conn.setAutoCommit(autocommit);
+
+ return conn;
+ }
+
+ /**
+ * @return Connection.
+ * @throws SQLException In case of error.
+ */
+ private Connection connection() throws SQLException {
+ CacheStoreSession ses = session();
+
+ if (ses.transaction() != null) {
+ Map<String, Connection> prop = ses.properties();
+
+ Connection conn = prop.get(ATTR_CONN_PROP);
+
+ if (conn == null) {
+ conn = openConnection(false);
+
+ // Store connection in session to used it for other operations in the same session.
+ prop.put(ATTR_CONN_PROP, conn);
+ }
+
+ return conn;
+ }
+ // Transaction can be null in case of simple load operation.
+ else
+ return openConnection(true);
+ }
+
+ /**
+ * Closes connection.
+ *
+ * @param conn Connection to close.
+ */
+ private void closeConnection(@Nullable Connection conn) {
+ CacheStoreSession ses = session();
+
+ // Close connection right away if there is no transaction.
+ if (ses.transaction() == null)
+ U.closeQuiet(conn);
+ }
+
+ /**
+ * Closes allocated resources depending on transaction status.
+ *
+ * @param conn Allocated connection.
+ * @param st Created statement,
+ */
+ private void end(@Nullable Connection conn, @Nullable Statement st) {
+ U.closeQuiet(st);
+
+ closeConnection(conn);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sessionEnd(boolean commit) throws CacheWriterException {
+ CacheStoreSession ses = session();
+
+ Transaction tx = ses.transaction();
+
+ if (tx != null) {
+ Connection conn = ses.<String, Connection>properties().remove(ATTR_CONN_PROP);
+
+ assert conn != null;
+
+ try {
+ if (commit)
+ conn.commit();
+ else
+ conn.rollback();
+ }
+ catch (SQLException e) {
+ throw new CacheWriterException(
+ "Failed to end transaction [xid=" + tx.xid() + ", commit=" + commit + ']', e);
+ }
+ finally {
+ U.closeQuiet(conn);
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Transaction ended [xid=" + tx.xid() + ", commit=" + commit + ']');
+ }
+ }
+
+ /**
+ * Retrieves the value of the designated column in the current row of this <code>ResultSet</code> object and will
+ * convert to the requested Java data type.
+ *
+ * @param rs Result set.
+ * @param colIdx Column index in result set.
+ * @param type Class representing the Java data type to convert the designated column to.
+ * @return Value in column.
+ * @throws SQLException If a database access error occurs or this method is called.
+ */
+ private Object getColumnValue(ResultSet rs, int colIdx, Class<?> type) throws SQLException {
+ Object val = rs.getObject(colIdx);
+
+ if (val == null)
+ return null;
+
+ if (type == int.class)
+ return rs.getInt(colIdx);
+
+ if (type == long.class)
+ return rs.getLong(colIdx);
+
+ if (type == double.class)
+ return rs.getDouble(colIdx);
+
+ if (type == boolean.class || type == Boolean.class)
+ return rs.getBoolean(colIdx);
+
+ if (type == byte.class)
+ return rs.getByte(colIdx);
+
+ if (type == short.class)
+ return rs.getShort(colIdx);
+
+ if (type == float.class)
+ return rs.getFloat(colIdx);
+
+ if (type == Integer.class || type == Long.class || type == Double.class ||
+ type == Byte.class || type == Short.class || type == Float.class) {
+ Number num = (Number)val;
+
+ if (type == Integer.class)
+ return num.intValue();
+ else if (type == Long.class)
+ return num.longValue();
+ else if (type == Double.class)
+ return num.doubleValue();
+ else if (type == Byte.class)
+ return num.byteValue();
+ else if (type == Short.class)
+ return num.shortValue();
+ else if (type == Float.class)
+ return num.floatValue();
+ }
+
+ if (type == UUID.class) {
+ if (val instanceof UUID)
+ return val;
+
+ if (val instanceof byte[]) {
+ ByteBuffer bb = ByteBuffer.wrap((byte[])val);
+
+ long most = bb.getLong();
+ long least = bb.getLong();
+
+ return new UUID(most, least);
+ }
+
+ if (val instanceof String)
+ return UUID.fromString((String)val);
+ }
+
+ return val;
+ }
+
+ /**
+ * Construct load cache from range.
+ *
+ * @param em Type mapping description.
+ * @param clo Closure that will be applied to loaded values.
+ * @param lowerBound Lower bound for range.
+ * @param upperBound Upper bound for range.
+ * @return Callable for pool submit.
+ */
+ private Callable<Void> loadCacheRange(final EntryMapping em, final IgniteBiInClosure<K, V> clo,
+ @Nullable final Object[] lowerBound, @Nullable final Object[] upperBound) {
+ return new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ Connection conn = null;
+
+ PreparedStatement stmt = null;
+
+ try {
+ conn = openConnection(true);
+
+ stmt = conn.prepareStatement(lowerBound == null && upperBound == null
+ ? em.loadCacheQry
+ : em.loadCacheRangeQuery(lowerBound != null, upperBound != null));
+
+ int ix = 1;
+
+ if (lowerBound != null)
+ for (int i = lowerBound.length; i > 0; i--)
+ for (int j = 0; j < i; j++)
+ stmt.setObject(ix++, lowerBound[j]);
+
+ if (upperBound != null)
+ for (int i = upperBound.length; i > 0; i--)
+ for (int j = 0; j < i; j++)
+ stmt.setObject(ix++, upperBound[j]);
+
+ ResultSet rs = stmt.executeQuery();
+
+ while (rs.next()) {
+ K key = buildObject(em.cacheName, em.keyType(), em.keyColumns(), em.loadColIdxs, rs);
+ V val = buildObject(em.cacheName, em.valueType(), em.valueColumns(), em.loadColIdxs, rs);
+
+ clo.apply(key, val);
+ }
+ }
+ catch (SQLException e) {
+ throw new IgniteCheckedException("Failed to load cache", e);
+ }
+ finally {
+ U.closeQuiet(stmt);
+
+ U.closeQuiet(conn);
+ }
+
+ return null;
+ }
+ };
+ }
+
+ /**
+ * Construct load cache in one select.
+ *
+ * @param m Type mapping description.
+ * @param clo Closure for loaded values.
+ * @return Callable for pool submit.
+ */
+ private Callable<Void> loadCacheFull(EntryMapping m, IgniteBiInClosure<K, V> clo) {
+ return loadCacheRange(m, clo, null, null);
+ }
+
+ /**
+ * Object is a simple type.
+ *
+ * @param cls Class.
+ * @return {@code True} if object is a simple type.
+ */
+ private static boolean simpleType(Class<?> cls) {
+ return (Number.class.isAssignableFrom(cls) || String.class.isAssignableFrom(cls) ||
+ java.util.Date.class.isAssignableFrom(cls) || Boolean.class.isAssignableFrom(cls) ||
+ UUID.class.isAssignableFrom(cls));
+ }
+
+ /**
+ * @param cacheName Cache name to check mapping for.
+ * @param clsName Class name.
+ * @param fields Fields descriptors.
+ * @throws CacheException If failed to check type metadata.
+ */
+ private static void checkMapping(@Nullable String cacheName, String clsName,
+ CacheJdbcPojoStoreTypeField[] fields) throws CacheException {
+ try {
+ Class<?> cls = Class.forName(clsName);
+
+ if (simpleType(cls)) {
+ if (fields.length != 1)
+ throw new CacheException("More than one field for simple type [cache name=" + cacheName
+ + ", type=" + clsName + " ]");
+
+ CacheJdbcPojoStoreTypeField field = fields[0];
+
+ if (field.getDatabaseFieldName() == null)
+ throw new CacheException("Missing database name in mapping description [cache name=" + cacheName
+ + ", type=" + clsName + " ]");
+
+ field.setJavaFieldType(cls);
+ }
+ else
+ for (CacheJdbcPojoStoreTypeField field : fields) {
+ if (field.getDatabaseFieldName() == null)
+ throw new CacheException("Missing database name in mapping description [cache name=" + cacheName
+ + ", type=" + clsName + " ]");
+
+ if (field.getJavaFieldName() == null)
+ throw new CacheException("Missing field name in mapping description [cache name=" + cacheName
+ + ", type=" + clsName + " ]");
+
+ if (field.getJavaFieldType() == null)
+ throw new CacheException("Missing field type in mapping description [cache name=" + cacheName
+ + ", type=" + clsName + " ]");
+ }
+ }
+ catch (ClassNotFoundException e) {
+ throw new CacheException("Failed to find class: " + clsName, e);
+ }
+ }
+
+ /**
+ * For backward compatibility translate old field type descriptors to new format.
+ *
+ * @param oldFields Fields in old format.
+ * @return Fields in new format.
+ */
+ @Deprecated
+ private CacheJdbcPojoStoreTypeField[] translateFields(Collection<CacheTypeFieldMetadata> oldFields) {
+ CacheJdbcPojoStoreTypeField[] newFields = new CacheJdbcPojoStoreTypeField[oldFields.size()];
+
+ int idx = 0;
+
+ for (CacheTypeFieldMetadata oldField : oldFields) {
+ newFields[idx] = new CacheJdbcPojoStoreTypeField(oldField.getDatabaseType(), oldField.getDatabaseName(),
+ oldField.getJavaType(), oldField.getJavaName());
+
+ idx++;
+ }
+
+ return newFields;
+ }
+
+ /**
+ * @param cacheName Cache name to check mappings for.
+ * @return Type mappings for specified cache name.
+ * @throws CacheException If failed to initialize cache mappings.
+ */
+ private Map<Object, EntryMapping> cacheMappings(@Nullable String cacheName) throws CacheException {
+ Map<Object, EntryMapping> entryMappings = cacheMappings.get(cacheName);
+
+ if (entryMappings != null)
+ return entryMappings;
+
+ cacheMappingsLock.lock();
+
+ try {
+ entryMappings = cacheMappings.get(cacheName);
+
+ if (entryMappings != null)
+ return entryMappings;
+
+ // If no types configured, check CacheTypeMetadata for backward compatibility.
+ if (types == null) {
+ CacheConfiguration ccfg = ignite().cache(cacheName).getConfiguration(CacheConfiguration.class);
+
+ Collection<CacheTypeMetadata> oldTypes = ccfg.getTypeMetadata();
+
+ types = new CacheJdbcPojoStoreType[oldTypes.size()];
+
+ int idx = 0;
+
+ for (CacheTypeMetadata oldType : oldTypes) {
+ CacheJdbcPojoStoreType newType = new CacheJdbcPojoStoreType();
+
+ newType.setCacheName(cacheName);
+
+ newType.setDatabaseSchema(oldType.getDatabaseSchema());
+ newType.setDatabaseTable(oldType.getDatabaseTable());
+
+ newType.setKeyType(oldType.getKeyType());
+ newType.setKeyFields(translateFields(oldType.getKeyFields()));
+
+ newType.setValueType(oldType.getValueType());
+ newType.setValueFields(translateFields(oldType.getValueFields()));
+
+ types[idx] = newType;
+
+ idx++;
+ }
+ }
+
+ List<CacheJdbcPojoStoreType> cacheTypes = new ArrayList<>(types.length);
+
+ for (CacheJdbcPojoStoreType type : types)
+ if ((cacheName != null && cacheName.equals(type.getCacheName())) ||
+ (cacheName == null && type.getCacheName() == null))
+ cacheTypes.add(type);
+
+ entryMappings = U.newHashMap(cacheTypes.size());
+
+ if (!cacheTypes.isEmpty()) {
+ Map<String, Boolean> tk = new HashMap<>(cacheTypes.size() * 2);
+
+ for (CacheJdbcPojoStoreType type : cacheTypes) {
+ boolean keepSerialized = type.isKeepSerialized();
+
+ String keyType = type.getKeyType();
+ String valType = type.getValueType();
+
+ tk.put(keyType, keepSerialized);
+ tk.put(valType, keepSerialized);
+
+ Object keyTypeId = typeIdForTypeName(keepSerialized, keyType);
+
+ if (entryMappings.containsKey(keyTypeId))
+ throw new CacheException("Key type must be unique in type metadata [cache name=" + cacheName +
+ ", key type=" + keyType + "]");
+
+ checkMapping(cacheName, keyType, type.getKeyFields());
+ checkMapping(cacheName, valType, type.getValueFields());
+
+ entryMappings.put(keyTypeId, new EntryMapping(cacheName, dialect, type));
+ }
+
+ keepSerializedTypes.put(cacheName, tk);
+
+ Map<String, Map<Object, EntryMapping>> mappings = new HashMap<>(cacheMappings);
+ mappings.put(cacheName, entryMappings);
+
+ preparePojoBuilders(cacheName, cacheTypes);
+ preparePortableBuilders(cacheName, cacheTypes);
+
+ cacheMappings = mappings;
+ }
+
+ return entryMappings;
+ }
+ finally {
+ cacheMappingsLock.unlock();
+ }
+ }
+
+ /**
+ * @param cacheName Cache name.
+ * @param keyTypeId Key type id.
+ * @param key Key object.
+ * @return Entry mapping.
+ * @throws CacheException If mapping for key was not found.
+ */
+ private EntryMapping entryMapping(String cacheName, Object keyTypeId, Object key) throws CacheException {
+ EntryMapping em = cacheMappings(cacheName).get(keyTypeId);
+
+ if (em == null) {
+ String maskedCacheName = U.maskName(cacheName);
+
+ throw new CacheException("Failed to find mapping description [key=" + key +
+ ", cache=" + maskedCacheName + "]. Please configure CacheJdbcPojoStoreType to associate '" + maskedCacheName + "' with JdbcPojoStore.");
+ }
+
+ return em;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void loadCache(final IgniteBiInClosure<K, V> clo, @Nullable Object... args)
+ throws CacheLoaderException {
+ ExecutorService pool = null;
+
+ String cacheName = session().cacheName();
+
+ try {
+ pool = Executors.newFixedThreadPool(maxPoolSz);
+
+ Collection<Future<?>> futs = new ArrayList<>();
+
+ if (args != null && args.length > 0) {
+ if (args.length % 2 != 0)
+ throw new CacheLoaderException("Expected even number of arguments, but found: " + args.length);
+
+ if (log.isDebugEnabled())
+ log.debug("Start loading entries from db using user queries from arguments");
+
+ for (int i = 0; i < args.length; i += 2) {
+ String keyType = args[i].toString();
+
+ String selQry = args[i + 1].toString();
+
+ // We must build cache mappings first.
+ cacheMappings(cacheName);
+
+ EntryMapping em = entryMapping(cacheName, typeIdForTypeName(isKeepSerialized(cacheName, keyType),
+ keyType), keyType);
+
+ futs.add(pool.submit(new LoadCacheCustomQueryWorker<>(em, selQry, clo)));
+ }
+ }
+ else {
+ Collection<EntryMapping> entryMappings = cacheMappings(session().cacheName()).values();
+
+ for (EntryMapping em : entryMappings) {
+ if (parallelLoadCacheMinThreshold > 0) {
+ log.debug("Multithread loading entries from db [cache name=" + cacheName +
+ ", key type=" + em.keyType() + " ]");
+
+ Connection conn = null;
+
+ try {
+ conn = connection();
+
+ PreparedStatement stmt = conn.prepareStatement(em.loadCacheSelRangeQry);
+
+ stmt.setInt(1, parallelLoadCacheMinThreshold);
+
+ ResultSet rs = stmt.executeQuery();
+
+ if (rs.next()) {
+ int keyCnt = em.keyCols.size();
+
+ Object[] upperBound = new Object[keyCnt];
+
+ for (int i = 0; i < keyCnt; i++)
+ upperBound[i] = rs.getObject(i + 1);
+
+ futs.add(pool.submit(loadCacheRange(em, clo, null, upperBound)));
+
+ while (rs.next()) {
+ Object[] lowerBound = upperBound;
+
+ upperBound = new Object[keyCnt];
+
+ for (int i = 0; i < keyCnt; i++)
+ upperBound[i] = rs.getObject(i + 1);
+
+ futs.add(pool.submit(loadCacheRange(em, clo, lowerBound, upperBound)));
+ }
+
+ futs.add(pool.submit(loadCacheRange(em, clo, upperBound, null)));
+ }
+ else
+ futs.add(pool.submit(loadCacheFull(em, clo)));
+ }
+ catch (SQLException ignored) {
+ futs.add(pool.submit(loadCacheFull(em, clo)));
+ }
+ finally {
+ U.closeQuiet(conn);
+ }
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Single thread loading entries from db [cache name=" + cacheName +
+ ", key type=" + em.keyType() + " ]");
+
+ futs.add(pool.submit(loadCacheFull(em, clo)));
+ }
+ }
+ }
+
+ for (Future<?> fut : futs)
+ U.get(fut);
+
+ if (log.isDebugEnabled())
+ log.debug("Cache loaded from db: " + cacheName);
+ }
+ catch (IgniteCheckedException e) {
+ throw new CacheLoaderException("Failed to load cache: " + cacheName, e.getCause());
+ }
+ finally {
+ U.shutdownNow(getClass(), pool, log);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public V load(K key) throws CacheLoaderException {
+ assert key != null;
+
+ EntryMapping em = entryMapping(session().cacheName(), typeIdForObject(key), key);
+
+ if (log.isDebugEnabled())
+ log.debug("Load value from db [table= " + em.fullTableName() + ", key=" + key + "]");
+
+ Connection conn = null;
+
+ PreparedStatement stmt = null;
+
+ try {
+ conn = connection();
+
+ stmt = conn.prepareStatement(em.loadQrySingle);
+
+ fillKeyParameters(stmt, em, key);
+
+ ResultSet rs = stmt.executeQuery();
+
+ if (rs.next())
+ return buildObject(em.cacheName, em.valueType(), em.valueColumns(), em.loadColIdxs, rs);
+ }
+ catch (SQLException e) {
+ throw new CacheLoaderException("Failed to load object [table=" + em.fullTableName() +
+ ", key=" + key + "]", e);
+ }
+ finally {
+ end(conn, stmt);
+ }
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<K, V> loadAll(Iterable<? extends K> keys) throws CacheLoaderException {
+ assert keys != null;
+
+ Connection conn = null;
+
+ try {
+ conn = connection();
+
+ String cacheName = session().cacheName();
+
+ Map<Object, LoadWorker<K, V>> workers = U.newHashMap(cacheMappings(cacheName).size());
+
+ Map<K, V> res = new HashMap<>();
+
+ for (K key : keys) {
+ Object keyTypeId = typeIdForObject(key);
+
+ EntryMapping em = entryMapping(cacheName, keyTypeId, key);
+
+ LoadWorker<K, V> worker = workers.get(keyTypeId);
+
+ if (worker == null)
+ workers.put(keyTypeId, worker = new LoadWorker<>(conn, em));
+
+ worker.keys.add(key);
+
+ if (worker.keys.size() == em.maxKeysPerStmt)
+ res.putAll(workers.remove(keyTypeId).call());
+ }
+
+ for (LoadWorker<K, V> worker : workers.values())
+ res.putAll(worker.call());
+
+ return res;
+ }
+ catch (Exception e) {
+ throw new CacheWriterException("Failed to load entries from database", e);
+ }
+ finally {
+ closeConnection(conn);
+ }
+ }
+
+ /**
+ * @param insStmt Insert statement.
+ * @param updStmt Update statement.
+ * @param em Entry mapping.
+ * @param entry Cache entry.
+ * @throws CacheWriterException If failed to update record in database.
+ */
+ private void writeUpsert(PreparedStatement insStmt, PreparedStatement updStmt,
+ EntryMapping em, Cache.Entry<? extends K, ? extends V> entry) throws CacheWriterException {
+ try {
+ CacheWriterException we = null;
+
+ for (int attempt = 0; attempt < maxWrtAttempts; attempt++) {
+ int paramIdx = fillValueParameters(updStmt, 1, em, entry.getValue());
+
+ fillKeyParameters(updStmt, paramIdx, em, entry.getKey());
+
+ if (updStmt.executeUpdate() == 0) {
+ paramIdx = fillKeyParameters(insStmt, em, entry.getKey());
+
+ fillValueParameters(insStmt, paramIdx, em, entry.getValue());
+
+ try {
+ insStmt.executeUpdate();
+
+ if (attempt > 0)
+ U.warn(log, "Entry was inserted in database on second try [table=" + em.fullTableName() +
+ ", entry=" + entry + "]");
+ }
+ catch (SQLException e) {
+ String sqlState = e.getSQLState();
+
+ SQLException nested = e.getNextException();
+
+ while (sqlState == null && nested != null) {
+ sqlState = nested.getSQLState();
+
+ nested = nested.getNextException();
+ }
+
+ // The error with code 23505 or 23000 is thrown when trying to insert a row that
+ // would violate a unique index or primary key.
+ if ("23505".equals(sqlState) || "23000".equals(sqlState)) {
+ if (we == null)
+ we = new CacheWriterException("Failed insert entry in database, violate a unique" +
+ " index or primary key [table=" + em.fullTableName() + ", entry=" + entry + "]");
+
+ we.addSuppressed(e);
+
+ U.warn(log, "Failed insert entry in database, violate a unique index or primary key" +
+ " [table=" + em.fullTableName() + ", entry=" + entry + "]");
+
+ continue;
+ }
+
+ throw new CacheWriterException("Failed insert entry in database [table=" + em.fullTableName() +
+ ", entry=" + entry, e);
+ }
+ }
+
+ if (attempt > 0)
+ U.warn(log, "Entry was updated in database on second try [table=" + em.fullTableName() +
+ ", entry=" + entry + "]");
+
+ return;
+ }
+
+ throw we;
+ }
+ catch (SQLException e) {
+ throw new CacheWriterException("Failed update entry in database [table=" + em.fullTableName() +
+ ", entry=" + entry + "]", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(Cache.Entry<? extends K, ? extends V> entry) throws CacheWriterException {
+ assert entry != null;
+
+ K key = entry.getKey();
+
+ EntryMapping em = entryMapping(session().cacheName(), typeIdForObject(key), key);
+
+ if (log.isDebugEnabled())
+ log.debug("Start write entry to database [table=" + em.fullTableName() + ", entry=" + entry + "]");
+
+ Connection conn = null;
+
+ try {
+ conn = connection();
+
+ if (dialect.hasMerge()) {
+ PreparedStatement stmt = null;
+
+ try {
+ stmt = conn.prepareStatement(em.mergeQry);
+
+ int i = fillKeyParameters(stmt, em, key);
+
+ fillValueParameters(stmt, i, em, entry.getValue());
+
+ int updCnt = stmt.executeUpdate();
+
+ if (updCnt != 1)
+ U.warn(log, "Unexpected number of updated entries [table=" + em.fullTableName() +
+ ", entry=" + entry + "expected=1, actual=" + updCnt + "]");
+ }
+ finally {
+ U.closeQuiet(stmt);
+ }
+ }
+ else {
+ PreparedStatement insStmt = null;
+
+ PreparedStatement updStmt = null;
+
+ try {
+ insStmt = conn.prepareStatement(em.insQry);
+
+ updStmt = conn.prepareStatement(em.updQry);
+
+ writeUpsert(insStmt, updStmt, em, entry);
+ }
+ finally {
+ U.closeQuiet(insStmt);
+
+ U.closeQuiet(updStmt);
+ }
+ }
+ }
+ catch (SQLException e) {
+ throw new CacheWriterException("Failed to write entry to database [table=" + em.fullTableName() +
+ ", entry=" + entry + "]", e);
+ }
+ finally {
+ closeConnection(conn);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeAll(final Collection<Cache.Entry<? extends K, ? extends V>> entries)
+ throws CacheWriterException {
+ assert entries != null;
+
+ Connection conn = null;
+
+ try {
+ conn = connection();
+
+ String cacheName = session().cacheName();
+
+ Object currKeyTypeId = null;
+
+ if (dialect.hasMerge()) {
+ PreparedStatement mergeStmt = null;
+
+ try {
+ EntryMapping em = null;
+
+ LazyValue<Object[]> lazyEntries = new LazyValue<Object[]>() {
+ @Override public Object[] create() {
+ return entries.toArray();
+ }
+ };
+
+ int fromIdx = 0, prepared = 0;
+
+ for (Cache.Entry<? extends K, ? extends V> entry : entries) {
+ K key = entry.getKey();
+
+ Object keyTypeId = typeIdForObject(key);
+
+ em = entryMapping(cacheName, keyTypeId, key);
+
+ if (currKeyTypeId == null || !currKeyTypeId.equals(keyTypeId)) {
+ if (mergeStmt != null) {
+ if (log.isDebugEnabled())
+ log.debug("Write entries to db [cache name=" + cacheName +
+ ", key type=" + em.keyType() + ", count=" + prepared + "]");
+
+ executeBatch(em, mergeStmt, "writeAll", fromIdx, prepared, lazyEntries);
+
+ U.closeQuiet(mergeStmt);
+ }
+
+ mergeStmt = conn.prepareStatement(em.mergeQry);
+
+ currKeyTypeId = keyTypeId;
+
+ fromIdx += prepared;
+
+ prepared = 0;
+ }
+
+ int i = fillKeyParameters(mergeStmt, em, key);
+
+ fillValueParameters(mergeStmt, i, em, entry.getValue());
+
+ mergeStmt.addBatch();
+
+ if (++prepared % batchSz == 0) {
+ if (log.isDebugEnabled())
+ log.debug("Write entries to db [cache name=" + cacheName +
+ ", key type=" + em.keyType() + ", count=" + prepared + "]");
+
+ executeBatch(em, mergeStmt, "writeAll", fromIdx, prepared, lazyEntries);
+
+ fromIdx += prepared;
+
+ prepared = 0;
+ }
+ }
+
+ if (mergeStmt != null && prepared % batchSz != 0) {
+ if (log.isDebugEnabled())
+ log.debug("Write entries to db [cache name=" + cacheName +
+ ", key type=" + em.keyType() + ", count=" + prepared + "]");
+
+ executeBatch(em, mergeStmt, "writeAll", fromIdx, prepared, lazyEntries);
+
+ }
+ }
+ finally {
+ U.closeQuiet(mergeStmt);
+ }
+ }
+ else {
+ log.debug("Write entries to db one by one using update and insert statements [cache name=" +
+ cacheName + ", count=" + entries.size() + "]");
+
+ PreparedStatement insStmt = null;
+
+ PreparedStatement updStmt = null;
+
+ try {
+ for (Cache.Entry<? extends K, ? extends V> entry : entries) {
+ K key = entry.getKey();
+
+ Object keyTypeId = typeIdForObject(key);
+
+ EntryMapping em = entryMapping(cacheName, keyTypeId, key);
+
+ if (currKeyTypeId == null || !currKeyTypeId.equals(keyTypeId)) {
+ U.closeQuiet(insStmt);
+
+ insStmt = conn.prepareStatement(em.insQry);
+
+ U.closeQuiet(updStmt);
+
+ updStmt = conn.prepareStatement(em.updQry);
+
+ currKeyTypeId = keyTypeId;
+ }
+
+ writeUpsert(insStmt, updStmt, em, entry);
+ }
+ }
+ finally {
+ U.closeQuiet(insStmt);
+
+ U.closeQuiet(updStmt);
+ }
+ }
+ }
+ catch (SQLException e) {
+ throw new CacheWriterException("Failed to write entries in database", e);
+ }
+ finally {
+ closeConnection(conn);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void delete(Object key) throws CacheWriterException {
+ assert key != null;
+
+ EntryMapping em = entryMapping(session().cacheName(), typeIdForObject(key), key);
+
+ if (log.isDebugEnabled())
+ log.debug("Remove value from db [table=" + em.fullTableName() + ", key=" + key + "]");
+
+ Connection conn = null;
+
+ PreparedStatement stmt = null;
+
+ try {
+ conn = connection();
+
+ stmt = conn.prepareStatement(em.remQry);
+
+ fillKeyParameters(stmt, em, key);
+
+ int delCnt = stmt.executeUpdate();
+
+ if (delCnt != 1)
+ U.warn(log, "Unexpected number of deleted entries [table=" + em.fullTableName() + ", key=" + key +
+ ", expected=1, actual=" + delCnt + "]");
+ }
+ catch (SQLException e) {
+ throw new CacheWriterException("Failed to remove value from database [table=" + em.fullTableName() +
+ ", key=" + key + "]", e);
+ }
+ finally {
+ end(conn, stmt);
+ }
+ }
+
+ /**
+ * @param em Entry mapping.
+ * @param stmt Statement.
+ * @param desc Statement description for error message.
+ * @param fromIdx Objects in batch start from index.
+ * @param prepared Expected objects in batch.
+ * @param lazyObjs All objects used in batch statement as array.
+ * @throws SQLException If failed to execute batch statement.
+ */
+ private void executeBatch(EntryMapping em, Statement stmt, String desc, int fromIdx, int prepared,
+ LazyValue<Object[]> lazyObjs) throws SQLException {
+ try {
+ int[] rowCounts = stmt.executeBatch();
+
+ int numOfRowCnt = rowCounts.length;
+
+ if (numOfRowCnt != prepared)
+ U.warn(log, "Unexpected number of updated rows [table=" + em.fullTableName() + ", expected=" + prepared +
+ ", actual=" + numOfRowCnt + "]");
+
+ for (int i = 0; i < numOfRowCnt; i++) {
+ int cnt = rowCounts[i];
+
+ if (cnt != 1 && cnt != SUCCESS_NO_INFO) {
+ Object[] objs = lazyObjs.value();
+
+ U.warn(log, "Batch " + desc + " returned unexpected updated row count [table=" + em.fullTableName() +
+ ", entry=" + objs[fromIdx + i] + ", expected=1, actual=" + cnt + "]");
+ }
+ }
+ }
+ catch (BatchUpdateException be) {
+ int[] rowCounts = be.getUpdateCounts();
+
+ for (int i = 0; i < rowCounts.length; i++) {
+ if (rowCounts[i] == EXECUTE_FAILED) {
+ Object[] objs = lazyObjs.value();
+
+ U.warn(log, "Batch " + desc + " failed on execution [table=" + em.fullTableName() +
+ ", entry=" + objs[fromIdx + i] + "]");
+ }
+ }
+
+ throw be;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void deleteAll(final Collection<?> keys) throws CacheWriterException {
+ assert keys != null;
+
+ Connection conn = null;
+
+ try {
+ conn = connection();
+
+ LazyValue<Object[]> lazyKeys = new LazyValue<Object[]>() {
+ @Override public Object[] create() {
+ return keys.toArray();
+ }
+ };
+
+ String cacheName = session().cacheName();
+
+ Object currKeyTypeId = null;
+
+ EntryMapping em = null;
+
+ PreparedStatement delStmt = null;
+
+ int fromIdx = 0, prepared = 0;
+
+ for (Object key : keys) {
+ Object keyTypeId = typeIdForObject(key);
+
+ em = entryMapping(cacheName, keyTypeId, key);
+
+ if (delStmt == null) {
+ delStmt = conn.prepareStatement(em.remQry);
+
+ currKeyTypeId = keyTypeId;
+ }
+
+ if (!currKeyTypeId.equals(keyTypeId)) {
+ if (log.isDebugEnabled())
+ log.debug("Delete entries from db [cache name=" + cacheName +
+ ", key type=" + em.keyType() + ", count=" + prepared + "]");
+
+ executeBatch(em, delStmt, "deleteAll", fromIdx, prepared, lazyKeys);
+
+ fromIdx += prepared;
+
+ prepared = 0;
+
+ currKeyTypeId = keyTypeId;
+ }
+
+ fillKeyParameters(delStmt, em, key);
+
+ delStmt.addBatch();
+
+ if (++prepared % batchSz == 0) {
+ if (log.isDebugEnabled())
+ log.debug("Delete entries from db [cache name=" + cacheName +
+ ", key type=" + em.keyType() + ", count=" + prepared + "]");
+
+ executeBatch(em, delStmt, "deleteAll", fromIdx, prepared, lazyKeys);
+
+ fromIdx += prepared;
+
+ prepared = 0;
+ }
+ }
+
+ if (delStmt != null && prepared % batchSz != 0) {
+ if (log.isDebugEnabled())
+ log.debug("Delete entries from db [cache name=" + cacheName +
+ ", key type=" + em.keyType() + ", count=" + prepared + "]");
+
+ executeBatch(em, delStmt, "deleteAll", fromIdx, prepared, lazyKeys);
+ }
+ }
+ catch (SQLException e) {
+ throw new CacheWriterException("Failed to remove values from database", e);
+ }
+ finally {
+ closeConnection(conn);
+ }
+ }
+
+ /**
+ * Sets the value of the designated parameter using the given object.
+ *
+ * @param stmt Prepare statement.
+ * @param i Index for parameters.
+ * @param field Field descriptor.
+ * @param fieldVal Field value.
+ * @throws CacheException If failed to set statement parameter.
+ */
+ private void fillParameter(PreparedStatement stmt, int i, CacheJdbcPojoStoreTypeField field, @Nullable Object fieldVal)
+ throws CacheException {
+ try {
+ if (fieldVal != null) {
+ if (field.getJavaFieldType() == UUID.class) {
+ switch (field.getDatabaseFieldType()) {
+ case Types.BINARY:
+ fieldVal = U.uuidToBytes((UUID)fieldVal);
+
+ break;
+ case Types.CHAR:
+ case Types.VARCHAR:
+ fieldVal = fieldVal.toString();
+
+ break;
+ }
+ }
+
+ stmt.setObject(i, fieldVal);
+ }
+ else
+ stmt.setNull(i, field.getDatabaseFieldType());
+ }
+ catch (SQLException e) {
+ throw new CacheException("Failed to set statement parameter name: " + field.getDatabaseFieldName(), e);
+ }
+ }
+
+ /**
+ * @param stmt Prepare statement.
+ * @param idx Start index for parameters.
+ * @param em Entry mapping.
+ * @param key Key object.
+ * @return Next index for parameters.
+ * @throws CacheException If failed to set statement parameters.
+ */
+ private int fillKeyParameters(PreparedStatement stmt, int idx, EntryMapping em,
+ Object key) throws CacheException {
+ for (CacheJdbcPojoStoreTypeField field : em.keyColumns()) {
+ Object fieldVal = extractParameter(em.cacheName, em.keyType(), field.getJavaFieldName(), key);
+
+ fillParameter(stmt, idx++, field, fieldVal);
+ }
+
+ return idx;
+ }
+
+ /**
+ * @param stmt Prepare statement.
+ * @param m Type mapping description.
+ * @param key Key object.
+ * @return Next index for parameters.
+ * @throws CacheException If failed to set statement parameters.
+ */
+ private int fillKeyParameters(PreparedStatement stmt, EntryMapping m, Object key) throws CacheException {
+ return fillKeyParameters(stmt, 1, m, key);
+ }
+
+ /**
+ * @param stmt Prepare statement.
+ * @param idx Start index for parameters.
+ * @param em Type mapping description.
+ * @param val Value object.
+ * @return Next index for parameters.
+ * @throws CacheException If failed to set statement parameters.
+ */
+ private int fillValueParameters(PreparedStatement stmt, int idx, EntryMapping em, Object val)
+ throws CacheWriterException {
+ for (CacheJdbcPojoStoreTypeField field : em.uniqValFields) {
+ Object fieldVal = extractParameter(em.cacheName, em.valueType(), field.getJavaFieldName(), val);
+
+ fillParameter(stmt, idx++, field, fieldVal);
+ }
+
+ return idx;
+ }
+
+ /**
+ * @return Data source.
+ */
+ public DataSource getDataSource() {
+ return dataSrc;
+ }
+
+ /**
+ * @param dataSrc Data source.
+ */
+ public void setDataSource(DataSource dataSrc) {
+ this.dataSrc = dataSrc;
+ }
+
+ /**
+ * Get database dialect.
+ *
+ * @return Database dialect.
+ */
+ public JdbcDialect getDialect() {
+ return dialect;
+ }
+
+ /**
+ * Set database dialect.
+ *
+ * @param dialect Database dialect.
+ */
+ public void setDialect(JdbcDialect dialect) {
+ this.dialect = dialect;
+ }
+
+ /**
+ * Get Max workers thread count. These threads are responsible for execute query.
+ *
+ * @return Max workers thread count.
+ */
+ public int getMaximumPoolSize() {
+ return maxPoolSz;
+ }
+
+ /**
+ * Gets maximum number of write attempts in case of database error.
+ *
+ * @return Maximum number of write attempts.
+ */
+ public int getMaximumWriteAttempts() {
+ return maxWrtAttempts;
+ }
+
+ /**
+ * Sets maximum number of write attempts in case of database error.
+ *
+ * @param maxWrtAttempts Number of write attempts.
+ * @return {@code This} for chaining.
+ */
+ public CacheJdbcPojoStore<K, V> setMaximumWriteAttempts(int maxWrtAttempts) {
+ this.maxWrtAttempts = maxWrtAttempts;
+
+ return this;
+ }
+
+ /**
+ * Gets types known by store.
+ *
+ * @return Types known by store.
+ */
+ public CacheJdbcPojoStoreType[] getTypes() {
+ return types;
+ }
+
+ /**
+ * Sets store configurations.
+ *
+ * @param types Store should process.
+ * @return {@code This} for chaining.
+ */
+ public CacheJdbcPojoStore<K, V> setTypes(CacheJdbcPojoStoreType... types) {
+ this.types = types;
+
+ return this;
+ }
+
+ /**
+ * Set Max workers thread count. These threads are responsible for execute query.
+ *
+ * @param maxPoolSz Max workers thread count.
+ */
+ public void setMaximumPoolSize(int maxPoolSz) {
+ this.maxPoolSz = maxPoolSz;
+ }
+
+ /**
+ * Get maximum batch size for delete and delete operations.
+ *
+ * @return Maximum batch size.
+ */
+ public int getBatchSize() {
+ return batchSz;
+ }
+
+ /**
+ * Set maximum batch size for write and delete operations.
+ *
+ * @param batchSz Maximum batch size.
+ */
+ public void setBatchSize(int batchSz) {
+ this.batchSz = batchSz;
+ }
+
+ /**
+ * Parallel load cache minimum row count threshold.
+ *
+ * @return If {@code 0} then load sequentially.
+ */
+ public int getParallelLoadCacheMinimumThreshold() {
+ return parallelLoadCacheMinThreshold;
+ }
+
+ /**
+ * Parallel load cache minimum row count threshold.
+ *
+ * @param parallelLoadCacheMinThreshold Minimum row count threshold. If {@code 0} then load sequentially.
+ */
+ public void setParallelLoadCacheMinimumThreshold(int parallelLoadCacheMinThreshold) {
+ this.parallelLoadCacheMinThreshold = parallelLoadCacheMinThreshold;
+ }
+
+ /**
+ * @return Ignite instance.
+ */
+ private Ignite ignite() {
+ return ignite;
+ }
+
+ /**
+ * @return Store session.
+ */
+ private CacheStoreSession session() {
+ return ses;
+ }
+
/**
* POJO methods cache.
*/
- protected static class PojoMethodsCache {
+ private static class PojoMethodsCache {
/** POJO class. */
- protected final Class<?> cls;
+ private final Class<?> cls;
/** Constructor for POJO object. */
private Constructor ctor;
@@ -67,10 +1845,9 @@ public class CacheJdbcPojoStore<K, V> extends CacheAbstractJdbcStore<K, V> {
*
* @param clsName Class name.
* @param fields Fields.
- *
* @throws CacheException If failed to construct type cache.
*/
- public PojoMethodsCache(String clsName, Collection<CacheTypeFieldMetadata> fields) throws CacheException {
+ public PojoMethodsCache(String clsName, CacheJdbcPojoStoreTypeField[] fields) throws CacheException {
try {
cls = Class.forName(clsName);
@@ -89,32 +1866,32 @@ public class CacheJdbcPojoStore<K, V> extends CacheAbstractJdbcStore<K, V> {
throw new CacheException("Failed to find default constructor for class: " + clsName, e);
}
- setters = U.newHashMap(fields.size());
+ setters = U.newHashMap(fields.length);
- getters = U.newHashMap(fields.size());
+ getters = U.newHashMap(fields.length);
- for (CacheTypeFieldMetadata field : fields) {
- String prop = capitalFirst(field.getJavaName());
+ for (CacheJdbcPojoStoreTypeField field : fields) {
+ String prop = capitalFirst(field.getJavaFieldName());
try {
- getters.put(field.getJavaName(), cls.getMethod("get" + prop));
+ getters.put(field.getJavaFieldName(), cls.getMethod("get" + prop));
}
catch (NoSuchMethodException ignored) {
try {
- getters.put(field.getJavaName(), cls.getMethod("is" + prop));
+ getters.put(field.getJavaFieldName(), cls.getMethod("is" + prop));
}
catch (NoSuchMethodException e) {
throw new CacheException("Failed to find getter in POJO class [clsName=" + clsName +
- ", prop=" + field.getJavaName() + "]", e);
+ ", prop=" + field.getJavaFieldName() + "]", e);
}
}
try {
- setters.put(field.getJavaName(), cls.getMethod("set" + prop, field.getJavaType()));
+ setters.put(field.getJavaFieldName(), cls.getMethod("set" + prop, field.getJavaFieldType()));
}
catch (NoSuchMethodException e) {
throw new CacheException("Failed to find setter in POJO class [clsName=" + clsName +
- ", prop=" + field.getJavaName() + "]", e);
+ ", prop=" + field.getJavaFieldName() + "]", e);
}
}
}
@@ -131,116 +1908,366 @@ public class CacheJdbcPojoStore<K, V> extends CacheAbstractJdbcStore<K, V> {
}
}
- /** Methods cache. */
- protected volatile Map<String, Map<String, PojoMethodsCache>> mtdsCache = Collections.emptyMap();
+ /**
+ * Entry mapping description.
+ */
+ private static class EntryMapping {
+ /** Cache name. */
+ private final String cacheName;
- /** {@inheritDoc} */
- @Override protected void prepareBuilders(@Nullable String cacheName, Collection<CacheTypeMetadata> types)
- throws CacheException {
- Map<String, PojoMethodsCache> typeMethods = U.newHashMap(types.size() * 2);
+ /** Database dialect. */
+ private final JdbcDialect dialect;
- for (CacheTypeMetadata type : types) {
- String keyType = type.getKeyType();
- typeMethods.put(keyType, new PojoMethodsCache(keyType, type.getKeyFields()));
+ /** Select border for range queries. */
+ private final String loadCacheSelRangeQry;
- String valType = type.getValueType();
- typeMethods.put(valType, new PojoMethodsCache(valType, type.getValueFields()));
- }
+ /** Select all items query. */
+ private final String loadCacheQry;
- Map<String, Map<String, PojoMethodsCache>> newMtdsCache = new HashMap<>(mtdsCache);
+ /** Select item query. */
+ private final String loadQrySingle;
- newMtdsCache.put(cacheName, typeMethods);
+ /** Select items query. */
+ private final String loadQry;
- mtdsCache = newMtdsCache;
- }
+ /** Merge item(s) query. */
+ private final String mergeQry;
- /** {@inheritDoc} */
- @Override protected <R> R buildObject(String cacheName, String typeName, Collection<CacheTypeFieldMetadata> fields,
- Map<String, Integer> loadColIdxs, ResultSet rs) throws CacheLoaderException {
- PojoMethodsCache mc = mtdsCache.get(cacheName).get(typeName);
+ /** Update item query. */
+ private final String insQry;
- if (mc == null)
- throw new CacheLoaderException("Failed to find cache type metadata for type: " + typeName);
+ /** Update item query. */
+ private final String updQry;
- try {
- if (mc.simple) {
- CacheTypeFieldMetadata field = F.first(fields);
+ /** Remove item(s) query. */
+ private final String remQry;
- return (R)getColumnValue(rs, loadColIdxs.get(field.getDatabaseName()), mc.cls);
- }
+ /** Max key count for load query per statement. */
+ private final int maxKeysPerStmt;
- Object obj = mc.ctor.newInstance();
+ /** Database key columns. */
+ private final Collection<String> keyCols;
- for (CacheTypeFieldMetadata field : fields) {
- String fldJavaName = field.getJavaName();
+ /** Database unique value columns. */
+ private final Collection<String> cols;
- Method setter = mc.setters.get(fldJavaName);
+ /** Select query columns index. */
+ private final Map<String, Integer> loadColIdxs;
- if (setter == null)
- throw new IllegalStateException("Failed to find setter in POJO class [clsName=" + typeName +
- ", prop=" + fldJavaName + "]");
+ /** Unique value fields. */
+ private final Collection<CacheJdbcPojoStoreTypeField> uniqValFields;
- String fldDbName = field.getDatabaseName();
+ /** Type metadata. */
+ private final CacheJdbcPojoStoreType typeMeta;
- Integer colIdx = loadColIdxs.get(fldDbName);
+ /** Full table name. */
+ private final String fullTblName;
- try {
- setter.invoke(obj, getColumnValue(rs, colIdx, field.getJavaType()));
+ /**
+ * @param cacheName Cache name.
+ * @param dialect JDBC dialect.
+ * @param typeMeta Type metadata.
+ */
+ public EntryMapping(@Nullable String cacheName, JdbcDialect dialect, CacheJdbcPojoStoreType typeMeta) {
+ this.cacheName = cacheName;
+
+ this.dialect = dialect;
+
+ this.typeMeta = typeMeta;
+
+ CacheJdbcPojoStoreTypeField[] keyFields = typeMeta.getKeyFields();
+
+ CacheJdbcPojoStoreTypeField[] valFields = typeMeta.getValueFields();
+
+ keyCols = databaseColumns(F.asList(keyFields));
+
+ uniqValFields = F.view(F.asList(valFields), new IgnitePredicate<CacheJdbcPojoStoreTypeField>() {
+ @Override public boolean apply(CacheJdbcPojoStoreTypeField col) {
+ return !keyCols.contains(col.getDatabaseFieldName());
}
- catch (Exception e) {
- throw new IllegalStateException("Failed to set property in POJO class [clsName=" + typeName +
- ", prop=" + fldJavaName + ", col=" + colIdx + ", dbName=" + fldDbName + "]", e);
+ });
+
+ String schema = typeMeta.getDatabaseSchema();
+
+ String tblName = typeMeta.getDatabaseTable();
+
+ fullTblName = F.isEmpty(schema) ? tblName : schema + "." + tblName;
+
+ Collection<String> uniqValCols = databaseColumns(uniqValFields);
+
+ cols = F.concat(false, keyCols, uniqValCols);
+
+ loadColIdxs = U.newHashMap(cols.size());
+
+ int idx = 1;
+
+ for (String col : cols)
+ loadColIdxs.put(col, idx++);
+
+ loadCacheQry = dialect.loadCacheQuery(fullTblName, cols);
+
+ loadCacheSelRangeQry = dialect.loadCacheSelectRangeQuery(fullTblName, keyCols);
+
+ loadQrySingle = dialect.loadQuery(fullTblName, keyCols, cols, 1);
+
+ maxKeysPerStmt = dialect.getMaxParameterCount() / keyCols.size();
+
+ loadQry = dialect.loadQuery(fullTblName, keyCols, cols, maxKeysPerStmt);
+
+ insQry = dialect.insertQuery(fullTblName, keyCols, uniqValCols);
+
+ updQry = dialect.updateQuery(fullTblName, keyCols, uniqValCols);
+
+ mergeQry = dialect.mergeQuery(fullTblName, keyCols, uniqValCols);
+
+ remQry = dialect.removeQuery(fullTblName, keyCols);
+ }
+
+ /**
+ * Extract database column names from {@link CacheJdbcPojoStoreTypeField}.
+ *
+ * @param dsc collection of {@link CacheJdbcPojoStoreTypeField}.
+ * @return Collection with database column names.
+ */
+ private static Collection<String> databaseColumns(Collection<CacheJdbcPojoStoreTypeField> dsc) {
+ return F.transform(dsc, new C1<CacheJdbcPojoStoreTypeField, String>() {
+ /** {@inheritDoc} */
+ @Override public String apply(CacheJdbcPojoStoreTypeField col) {
+ return col.getDatabaseFieldName();
}
- }
+ });
+ }
+
+ /**
+ * Construct query for select values with key count less or equal {@code maxKeysPerStmt}
+ *
+ * @param keyCnt Key count.
+ * @return Load query statement text.
+ */
+ private String loadQuery(int keyCnt) {
+ assert keyCnt <= maxKeysPerStmt;
+
+ if (keyCnt == maxKeysPerStmt)
+ return loadQry;
- return (R)obj;
+ if (keyCnt == 1)
+ return loadQrySingle;
+
+ return dialect.loadQuery(fullTblName, keyCols, cols, keyCnt);
}
- catch (SQLException e) {
- throw new CacheLoaderException("Failed to read object of class: " + typeName, e);
+
+ /**
+ * Construct query for select values in range.
+ *
+ * @param appendLowerBound Need add lower bound for range.
+ * @param appendUpperBound Need add upper bound for range.
+ * @return Query with range.
+ */
+ private String loadCacheRangeQuery(boolean appendLowerBound, boolean appendUpperBound) {
+ return dialect.loadCacheRangeQuery(fullTblName, keyCols, cols, appendLowerBound, appendUpperBound);
}
- catch (Exception e) {
- throw new CacheLoaderException("Failed to construct instance of class: " + typeName, e);
+
+ /**
+ * @return Key type.
+ */
+ protected String keyType() {
+ return typeMeta.getKeyType();
}
- }
- /** {@inheritDoc} */
- @Nullable @Override protected Object extractParameter(String cacheName, String typeName, String fieldName,
- Object obj)
- throws CacheException {
- try {
- PojoMethodsCache mc = mtdsCache.get(cacheName).get(typeName);
+ /**
+ * @return Value type.
+ */
+ protected String valueType() {
+ return typeMeta.getValueType();
+ }
- if (mc == null)
- throw new CacheException("Failed to find cache type metadata for type: " + typeName);
+ /**
+ * Gets key columns.
+ *
+ * @return Key columns.
+ */
+ protected CacheJdbcPojoStoreTypeField[] keyColumns() {
+ return typeMeta.getKeyFields();
+ }
- if (mc.simple)
- return obj;
+ /**
+ * Gets value columns.
+ *
+ * @return Value columns.
+ */
+ protected CacheJdbcPojoStoreTypeField[] valueColumns() {
+ return typeMeta.getValueFields();
+ }
- Method getter = mc.getters.get(fieldName);
+ /**
+ * Get full table name.
+ *
+ * @return <schema>.<table name>
+ */
+ protected String fullTableName() {
+ return fullTblName;
+ }
+ }
- if (getter == null)
- throw new CacheLoaderException("Failed to find getter in POJO class [clsName=" + typeName +
- ", prop=" + fieldName + "]");
+ /**
+ * Worker for load cache using custom user query.
+ *
+ * @param <K1> Key type.
+ * @param <V1> Value type.
+ */
+ private class LoadCacheCustomQueryWorker<K1, V1> implements Callable<Void> {
+ /** Entry mapping description. */
+ private final EntryMapping em;
- return getter.invoke(obj);
+ /** User query. */
+ private final String qry;
+
+ /** Closure for loaded values. */
+ private final IgniteBiInClosure<K1, V1> clo;
+
+ /**
+ * @param em Entry mapping description.
+ * @param qry User query.
+ * @param clo Closure for loaded values.
+ */
+ private LoadCacheCustomQueryWorker(EntryMapping em, String qry, IgniteBiInClosure<K1, V1> clo) {
+ this.em = em;
+ this.qry = qry;
+ this.clo = clo;
}
- catch (Exception e) {
- throw new CacheException("Failed to read object of class: " + typeName, e);
+
+ /** {@inheritDoc} */
+ @Override public Void call() throws Exception {
+ if (log.isDebugEnabled())
+ log.debug("Load cache using custom query [cache name= " + em.cacheName +
+ ", key type=" + em.keyType() + ", query=" + qry + "]");
+
+ Connection conn = null;
+
+ PreparedStatement stmt = null;
+
+ try {
+ conn = openConnection(true);
+
+ stmt = conn.prepareStatement(qry);
+
+ ResultSet rs = stmt.executeQuery();
+
+ ResultSetMetaData meta = rs.getMetaData();
+
+ Map<String, Integer> colIdxs = U.newHashMap(meta.getColumnCount());
+
+ for (int i = 1; i <= meta.getColumnCount(); i++)
+ colIdxs.put(meta.getColumnLabel(i), i);
+
+ while (rs.next()) {
+ K1 key = buildObject(em.cacheName, em.keyType(), em.keyColumns(), colIdxs, rs);
+ V1 val = buildObject(em.cacheName, em.valueType(), em.valueColumns(), colIdxs, rs);
+
+ clo.apply(key, val);
+ }
+
+ return null;
+ }
+ catch (SQLException e) {
+ throw new CacheLoaderException("Failed to execute custom query for load cache", e);
+ }
+ finally {
+ U.closeQuiet(stmt);
+
+ U.closeQuiet(conn);
+ }
}
}
- /** {@inheritDoc} */
- @Override protected Object keyTypeId(Object key) throws CacheException {
- return key.getClass();
+ /**
+ * Lazy initialization of value.
+ *
+ * @param <T> Cached object type
+ */
+ private abstract static class LazyValue<T> {
+ /** Cached value. */
+ private T val;
+
+ /**
+ * @return Construct value.
+ */
+ protected abstract T create();
+
+ /**
+ * @return Value.
+ */
+ public T value() {
+ if (val == null)
+ val = create();
+
+ return val;
+ }
}
- /** {@inheritDoc} */
- @Override protected Object keyTypeId(String type) throws CacheException {
- try {
- return Class.forName(type);
+ /**
+ * Worker for load by keys.
+ *
+ * @param <K1> Key type.
+ * @param <V1> Value type.
+ */
+ private class LoadWorker<K1, V1> implements Callable<Map<K1, V1>> {
+ /** Connection. */
+ private final Connection conn;
+
+ /** Keys for load. */
+ private final Collection<K1> keys;
+
+ /** Entry mapping description. */
+ private final EntryMapping em;
+
+ /**
+ * @param conn Connection.
+ * @param em Entry mapping description.
+ */
+ private LoadWorker(Connection conn, EntryMapping em) {
+ this.conn = conn;
+ this.em = em;
+
+ keys = new ArrayList<>(em.maxKeysPerStmt);
}
- catch (ClassNotFoundException e) {
- throw new CacheException("Failed to find class: " + type, e);
+
+ /** {@inheritDoc} */
+ @Override public Map<K1, V1> call() throws Exception {
+ if (log.isDebugEnabled())
+ log.debug("Load values from db [table= " + em.fullTableName() +
+ ", key count=" + keys.size() + "]");
+
+ PreparedStatement stmt = null;
+
+ try {
+ stmt = conn.prepareStatement(em.loadQuery(keys.size()));
+
+ int idx = 1;
+
+ for (Object key : keys)
+ for (CacheJdbcPojoStoreTypeField field : em.keyColumns()) {
+ Object fieldVal = extractParameter(em.cacheName, em.keyType(), field.getJavaFieldName(), key);
+
+ fillParameter(stmt, idx++, field, fieldVal);
+ }
+
+ ResultSet rs = stmt.executeQuery();
+
+ Map<K1, V1> entries = U.newHashMap(keys.size());
+
+ while (rs.next()) {
+ K1 key = buildObject(em.cacheName, em.keyType(), em.keyColumns(), em.loadColIdxs, rs);
+ V1 val = buildObject(em.cacheName, em.valueType(), em.valueColumns(), em.loadColIdxs, rs);
+
+ entries.put(key, val);
+ }
+
+ return entries;
+ }
+ finally {
+ U.closeQuiet(stmt);
+ }
}
}
-}
\ No newline at end of file
+}