You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2017/05/31 12:28:01 UTC
[15/27] ignite git commit: ignite-4220 Support statements for JDBC
and Cassandra store
ignite-4220 Support statements for JDBC and Cassandra store
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/25c06b50
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/25c06b50
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/25c06b50
Branch: refs/heads/ignite-5232-1.7.2
Commit: 25c06b50d46937cb39534cdf4147b862217289a2
Parents: 075bcfc
Author: rfqu <rf...@list.ru>
Authored: Tue May 2 19:46:44 2017 +0300
Committer: agura <ag...@apache.org>
Committed: Thu May 4 14:52:34 2017 +0300
----------------------------------------------------------------------
.../store/cassandra/CassandraCacheStore.java | 16 ++++-
.../session/LoadCacheCustomQueryWorker.java | 26 +++++--
.../ignite/tests/IgnitePersistentStoreTest.java | 23 +++++--
.../store/jdbc/CacheAbstractJdbcStore.java | 72 ++++++++++++++------
.../CacheJdbcPojoStoreAbstractSelfTest.java | 49 +++++++++++++
5 files changed, 152 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/25c06b50/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
index e8da3a7..2e1d3ea 100644
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
+++ b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
@@ -20,6 +20,7 @@ package org.apache.ignite.cache.store.cassandra;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -103,10 +104,19 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
CassandraSession ses = getCassandraSession();
for (Object obj : args) {
- if (obj == null || !(obj instanceof String) || !((String)obj).trim().toLowerCase().startsWith("select"))
- continue;
+ LoadCacheCustomQueryWorker<K, V> task = null;
- futs.add(pool.submit(new LoadCacheCustomQueryWorker<>(ses, (String) obj, controller, log, clo)));
+ if (obj instanceof Statement)
+ task = new LoadCacheCustomQueryWorker<>(ses, (Statement)obj, controller, log, clo);
+ else if (obj instanceof String) {
+ String qry = ((String)obj).trim();
+
+ if (qry.toLowerCase().startsWith("select"))
+ task = new LoadCacheCustomQueryWorker<>(ses, (String) obj, controller, log, clo);
+ }
+
+ if (task != null)
+ futs.add(pool.submit(task));
}
for (Future<?> fut : futs)
http://git-wip-us.apache.org/repos/asf/ignite/blob/25c06b50/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java
index d3ace7d..d186b98 100644
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java
+++ b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java
@@ -36,8 +36,8 @@ public class LoadCacheCustomQueryWorker<K, V> implements Callable<Void> {
/** Cassandra session to execute CQL query */
private final CassandraSession ses;
- /** User query. */
- private final String qry;
+ /** Statement. */
+ private final Statement stmt;
/** Persistence controller */
private final PersistenceController ctrl;
@@ -49,12 +49,28 @@ public class LoadCacheCustomQueryWorker<K, V> implements Callable<Void> {
private final IgniteBiInClosure<K, V> clo;
/**
+ * @param ses Session.
+ * @param qry Query.
+ * @param ctrl Control.
+ * @param log Logger.
* @param clo Closure for loaded values.
*/
public LoadCacheCustomQueryWorker(CassandraSession ses, String qry, PersistenceController ctrl,
- IgniteLogger log, IgniteBiInClosure<K, V> clo) {
+ IgniteLogger log, IgniteBiInClosure<K, V> clo) {
+ this(ses, new SimpleStatement(qry.trim().endsWith(";") ? qry : qry + ';'), ctrl, log, clo);
+ }
+
+ /**
+ * @param ses Session.
+ * @param stmt Statement.
+ * @param ctrl Control.
+ * @param log Logger.
+ * @param clo Closure for loaded values.
+ */
+ public LoadCacheCustomQueryWorker(CassandraSession ses, Statement stmt, PersistenceController ctrl,
+ IgniteLogger log, IgniteBiInClosure<K, V> clo) {
this.ses = ses;
- this.qry = qry.trim().endsWith(";") ? qry : qry + ";";
+ this.stmt = stmt;
this.ctrl = ctrl;
this.log = log;
this.clo = clo;
@@ -70,7 +86,7 @@ public class LoadCacheCustomQueryWorker<K, V> implements Callable<Void> {
/** {@inheritDoc} */
@Override public Statement getStatement() {
- return new SimpleStatement(qry);
+ return stmt;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/25c06b50/modules/cassandra/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java b/modules/cassandra/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java
index 5da6ba2..51d0885 100644
--- a/modules/cassandra/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java
+++ b/modules/cassandra/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.tests;
+import com.datastax.driver.core.SimpleStatement;
import java.util.Collection;
import java.util.Map;
import org.apache.ignite.Ignite;
@@ -35,6 +36,7 @@ import org.apache.log4j.Logger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.Assert;
import org.springframework.core.io.ClassPathResource;
/**
@@ -346,20 +348,29 @@ public class IgnitePersistentStoreTest {
LOGGER.info("Running loadCache test");
try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/pojo/ignite-config.xml")) {
- IgniteCache<PersonId, Person> personCache3 = ignite.getOrCreateCache(new CacheConfiguration<PersonId, Person>("cache3"));
+ CacheConfiguration<PersonId, Person> ccfg = new CacheConfiguration<>("cache3");
+
+ IgniteCache<PersonId, Person> personCache3 = ignite.getOrCreateCache(ccfg);
+
int size = personCache3.size(CachePeekMode.ALL);
LOGGER.info("Initial cache size " + size);
LOGGER.info("Loading cache data from Cassandra table");
- personCache3.loadCache(null, new String[] {"select * from test1.pojo_test3 limit 3"});
+ String qry = "select * from test1.pojo_test3 limit 3";
+
+ personCache3.loadCache(null, qry);
size = personCache3.size(CachePeekMode.ALL);
- if (size != 3) {
- throw new RuntimeException("Cache data was incorrectly loaded from Cassandra. " +
- "Expected number of records is 3, but loaded number of records is " + size);
- }
+ Assert.assertEquals("Cache data was incorrectly loaded from Cassandra table by '" + qry + "'", 3, size);
+
+ personCache3.clear();
+
+ personCache3.loadCache(null, new SimpleStatement(qry));
+
+ size = personCache3.size(CachePeekMode.ALL);
+ Assert.assertEquals("Cache data was incorrectly loaded from Cassandra table by statement", 3, size);
LOGGER.info("Cache data loaded from Cassandra table");
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/25c06b50/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
index e7ce526..e211fad 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
@@ -81,7 +81,6 @@ import static org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreFactory.DFLT_
import static org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreFactory.DFLT_PARALLEL_LOAD_CACHE_MINIMUM_THRESHOLD;
import static org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreFactory.DFLT_WRITE_ATTEMPTS;
import static org.apache.ignite.cache.store.jdbc.JdbcTypesTransformer.NUMERIC_TYPES;
-import static org.apache.ignite.cache.store.jdbc.JdbcTypesTransformer.NUMERIC_TYPES;
/**
* Implementation of {@link CacheStore} backed by JDBC.
@@ -753,17 +752,34 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
}
}))
throw new CacheLoaderException("Provided key type is not found in store or cache configuration " +
- "[cache=" + U.maskName(cacheName) + ", key=" + keyType + "]");
-
- String qry = args[i + 1].toString();
+ "[cache=" + U.maskName(cacheName) + ", key=" + keyType + ']');
EntryMapping em = entryMapping(cacheName, typeIdForTypeName(kindForName(keyType), keyType));
- if (log.isInfoEnabled())
- log.info("Started load cache using custom query [cache=" + U.maskName(cacheName) +
- ", keyType=" + keyType + ", query=" + qry + "]");
+ Object arg = args[i + 1];
+
+ LoadCacheCustomQueryWorker<K, V> task;
+
+ if (arg instanceof PreparedStatement) {
+ PreparedStatement stmt = (PreparedStatement)arg;
+
+ if (log.isInfoEnabled())
+ log.info("Started load cache using custom statement [cache=" + U.maskName(cacheName) +
+ ", keyType=" + keyType + ", stmt=" + stmt + ']');
+
+ task = new LoadCacheCustomQueryWorker<>(em, stmt, clo);
+ }
+ else {
+ String qry = arg.toString();
+
+ if (log.isInfoEnabled())
+ log.info("Started load cache using custom query [cache=" + U.maskName(cacheName) +
+ ", keyType=" + keyType + ", query=" + qry + ']');
+
+ task = new LoadCacheCustomQueryWorker<>(em, qry, clo);
+ }
- futs.add(pool.submit(new LoadCacheCustomQueryWorker<>(em, qry, clo)));
+ futs.add(pool.submit(task));
}
}
else {
@@ -778,7 +794,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
processedKeyTypes.add(keyType);
if (log.isInfoEnabled())
- log.info("Started load cache [cache=" + U.maskName(cacheName) + ", keyType=" + keyType + "]");
+ log.info("Started load cache [cache=" + U.maskName(cacheName) + ", keyType=" + keyType + ']');
if (parallelLoadCacheMinThreshold > 0) {
Connection conn = null;
@@ -795,7 +811,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
if (rs.next()) {
if (log.isDebugEnabled())
log.debug("Multithread loading entries from db [cache=" + U.maskName(cacheName) +
- ", keyType=" + keyType + "]");
+ ", keyType=" + keyType + ']');
int keyCnt = em.keyCols.size();
@@ -824,7 +840,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
}
catch (SQLException e) {
log.warning("Failed to load entries from db in multithreaded mode, will try in single thread " +
- "[cache=" + U.maskName(cacheName) + ", keyType=" + keyType + " ]", e);
+ "[cache=" + U.maskName(cacheName) + ", keyType=" + keyType + ']', e);
}
finally {
U.closeQuiet(conn);
@@ -833,7 +849,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
if (log.isDebugEnabled())
log.debug("Single thread loading entries from db [cache=" + U.maskName(cacheName) +
- ", keyType=" + keyType + "]");
+ ", keyType=" + keyType + ']');
futs.add(pool.submit(loadCacheFull(em, clo)));
}
@@ -860,7 +876,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
EntryMapping em = entryMapping(session().cacheName(), typeIdForObject(key));
if (log.isDebugEnabled())
- log.debug("Load value from db [table= " + em.fullTableName() + ", key=" + key + "]");
+ log.debug("Load value from db [table= " + em.fullTableName() + ", key=" + key + ']');
Connection conn = null;
@@ -1954,14 +1970,28 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
/** Entry mapping description. */
private final EntryMapping em;
+ /** User statement. */
+ private PreparedStatement stmt;
+
/** User query. */
- private final String qry;
+ private String qry;
/** Closure for loaded values. */
private final IgniteBiInClosure<K1, V1> clo;
/**
* @param em Entry mapping description.
+ * @param stmt User statement.
+ * @param clo Closure for loaded values.
+ */
+ private LoadCacheCustomQueryWorker(EntryMapping em, PreparedStatement stmt, IgniteBiInClosure<K1, V1> clo) {
+ this.em = em;
+ this.stmt = stmt;
+ this.clo = clo;
+ }
+
+ /**
+ * @param em Entry mapping description.
* @param qry User query.
* @param clo Closure for loaded values.
*/
@@ -1975,12 +2005,12 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
@Override public Void call() throws Exception {
Connection conn = null;
- PreparedStatement stmt = null;
-
try {
- conn = openConnection(true);
+ if (stmt == null) {
+ conn = openConnection(true);
- stmt = conn.prepareStatement(qry);
+ stmt = conn.prepareStatement(qry);
+ }
stmt.setFetchSize(dialect.getFetchSize());
@@ -2006,9 +2036,11 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
throw new CacheLoaderException("Failed to execute custom query for load cache", e);
}
finally {
- U.closeQuiet(stmt);
+ if (conn != null) {
+ U.closeQuiet(stmt);
- U.closeQuiet(conn);
+ U.closeQuiet(conn);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/25c06b50/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreAbstractSelfTest.java
index 1de44f7..9e59769 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreAbstractSelfTest.java
@@ -329,6 +329,55 @@ public abstract class CacheJdbcPojoStoreAbstractSelfTest extends GridCommonAbstr
}
/**
+ * Checks that data was loaded correctly with prepared statement.
+ */
+ protected void checkCacheLoadWithStatement() throws SQLException {
+ Connection conn = null;
+
+ PreparedStatement stmt = null;
+
+ try {
+ conn = getConnection();
+
+ conn.setAutoCommit(true);
+
+ String qry = "select id, org_id, name, birthday, gender from Person";
+
+ stmt = conn.prepareStatement(qry);
+
+ IgniteCache<Object, Object> c1 = grid().cache(CACHE_NAME);
+
+ c1.loadCache(null, "org.apache.ignite.cache.store.jdbc.model.PersonKey", stmt);
+
+ assertEquals(PERSON_CNT, c1.size());
+ }
+ finally {
+ U.closeQuiet(stmt);
+
+ U.closeQuiet(conn);
+ }
+
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testLoadCacheWithStatement() throws Exception {
+ startTestGrid(false, false, false, false, 512);
+
+ checkCacheLoadWithStatement();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testLoadCacheWithStatementTx() throws Exception {
+ startTestGrid(false, false, false, true, 512);
+
+ checkCacheLoadWithStatement();
+ }
+
+ /**
* @throws Exception If failed.
*/
public void testLoadCache() throws Exception {