You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2017/05/31 12:28:01 UTC

[15/27] ignite git commit: ignite-4220 Support statements for JDBC and Cassandra store

ignite-4220 Support statements for JDBC and Cassandra store


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/25c06b50
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/25c06b50
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/25c06b50

Branch: refs/heads/ignite-5232-1.7.2
Commit: 25c06b50d46937cb39534cdf4147b862217289a2
Parents: 075bcfc
Author: rfqu <rf...@list.ru>
Authored: Tue May 2 19:46:44 2017 +0300
Committer: agura <ag...@apache.org>
Committed: Thu May 4 14:52:34 2017 +0300

----------------------------------------------------------------------
 .../store/cassandra/CassandraCacheStore.java    | 16 ++++-
 .../session/LoadCacheCustomQueryWorker.java     | 26 +++++--
 .../ignite/tests/IgnitePersistentStoreTest.java | 23 +++++--
 .../store/jdbc/CacheAbstractJdbcStore.java      | 72 ++++++++++++++------
 .../CacheJdbcPojoStoreAbstractSelfTest.java     | 49 +++++++++++++
 5 files changed, 152 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/25c06b50/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
index e8da3a7..2e1d3ea 100644
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
+++ b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
@@ -20,6 +20,7 @@ package org.apache.ignite.cache.store.cassandra;
 import com.datastax.driver.core.BoundStatement;
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Statement;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -103,10 +104,19 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
             CassandraSession ses = getCassandraSession();
 
             for (Object obj : args) {
-                if (obj == null || !(obj instanceof String) || !((String)obj).trim().toLowerCase().startsWith("select"))
-                    continue;
+                LoadCacheCustomQueryWorker<K, V> task = null;
 
-                futs.add(pool.submit(new LoadCacheCustomQueryWorker<>(ses, (String) obj, controller, log, clo)));
+                if (obj instanceof Statement)
+                    task = new LoadCacheCustomQueryWorker<>(ses, (Statement)obj, controller, log, clo);
+                else if (obj instanceof String) {
+                    String qry = ((String)obj).trim();
+
+                    if (qry.toLowerCase().startsWith("select"))
+                        task = new LoadCacheCustomQueryWorker<>(ses, (String) obj, controller, log, clo);
+                }
+
+                if (task != null)
+                    futs.add(pool.submit(task));
             }
 
             for (Future<?> fut : futs)

http://git-wip-us.apache.org/repos/asf/ignite/blob/25c06b50/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java
index d3ace7d..d186b98 100644
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java
+++ b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java
@@ -36,8 +36,8 @@ public class LoadCacheCustomQueryWorker<K, V> implements Callable<Void> {
     /** Cassandra session to execute CQL query */
     private final CassandraSession ses;
 
-    /** User query. */
-    private final String qry;
+    /** Statement. */
+    private final Statement stmt;
 
     /** Persistence controller */
     private final PersistenceController ctrl;
@@ -49,12 +49,28 @@ public class LoadCacheCustomQueryWorker<K, V> implements Callable<Void> {
     private final IgniteBiInClosure<K, V> clo;
 
     /**
+     * @param ses Session.
+     * @param qry Query.
+     * @param ctrl Control.
+     * @param log Logger.
      * @param clo Closure for loaded values.
      */
     public LoadCacheCustomQueryWorker(CassandraSession ses, String qry, PersistenceController ctrl,
-        IgniteLogger log, IgniteBiInClosure<K, V> clo) {
+                                      IgniteLogger log, IgniteBiInClosure<K, V> clo) {
+        this(ses, new SimpleStatement(qry.trim().endsWith(";") ? qry : qry + ';'), ctrl, log, clo);
+    }
+
+    /**
+     * @param ses Session.
+     * @param stmt Statement.
+     * @param ctrl Control.
+     * @param log Logger.
+     * @param clo Closure for loaded values.
+     */
+    public LoadCacheCustomQueryWorker(CassandraSession ses, Statement stmt, PersistenceController ctrl,
+                                      IgniteLogger log, IgniteBiInClosure<K, V> clo) {
         this.ses = ses;
-        this.qry = qry.trim().endsWith(";") ? qry : qry + ";";
+        this.stmt = stmt;
         this.ctrl = ctrl;
         this.log = log;
         this.clo = clo;
@@ -70,7 +86,7 @@ public class LoadCacheCustomQueryWorker<K, V> implements Callable<Void> {
 
             /** {@inheritDoc} */
             @Override public Statement getStatement() {
-                return new SimpleStatement(qry);
+                return stmt;
             }
 
             /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/25c06b50/modules/cassandra/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java b/modules/cassandra/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java
index 5da6ba2..51d0885 100644
--- a/modules/cassandra/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java
+++ b/modules/cassandra/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.tests;
 
+import com.datastax.driver.core.SimpleStatement;
 import java.util.Collection;
 import java.util.Map;
 import org.apache.ignite.Ignite;
@@ -35,6 +36,7 @@ import org.apache.log4j.Logger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.Assert;
 import org.springframework.core.io.ClassPathResource;
 
 /**
@@ -346,20 +348,29 @@ public class IgnitePersistentStoreTest {
         LOGGER.info("Running loadCache test");
 
         try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/pojo/ignite-config.xml")) {
-            IgniteCache<PersonId, Person> personCache3 = ignite.getOrCreateCache(new CacheConfiguration<PersonId, Person>("cache3"));
+            CacheConfiguration<PersonId, Person> ccfg = new CacheConfiguration<>("cache3");
+
+            IgniteCache<PersonId, Person> personCache3 = ignite.getOrCreateCache(ccfg);
+
             int size = personCache3.size(CachePeekMode.ALL);
 
             LOGGER.info("Initial cache size " + size);
 
             LOGGER.info("Loading cache data from Cassandra table");
 
-            personCache3.loadCache(null, new String[] {"select * from test1.pojo_test3 limit 3"});
+            String qry = "select * from test1.pojo_test3 limit 3";
+
+            personCache3.loadCache(null, qry);
 
             size = personCache3.size(CachePeekMode.ALL);
-            if (size != 3) {
-                throw new RuntimeException("Cache data was incorrectly loaded from Cassandra. " +
-                    "Expected number of records is 3, but loaded number of records is " + size);
-            }
+            Assert.assertEquals("Cache data was incorrectly loaded from Cassandra table by '" + qry + "'", 3, size);
+
+            personCache3.clear();
+
+            personCache3.loadCache(null, new SimpleStatement(qry));
+
+            size = personCache3.size(CachePeekMode.ALL);
+            Assert.assertEquals("Cache data was incorrectly loaded from Cassandra table by statement", 3, size);
 
             LOGGER.info("Cache data loaded from Cassandra table");
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/25c06b50/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
index e7ce526..e211fad 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
@@ -81,7 +81,6 @@ import static org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreFactory.DFLT_
 import static org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreFactory.DFLT_PARALLEL_LOAD_CACHE_MINIMUM_THRESHOLD;
 import static org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreFactory.DFLT_WRITE_ATTEMPTS;
 import static org.apache.ignite.cache.store.jdbc.JdbcTypesTransformer.NUMERIC_TYPES;
-import static org.apache.ignite.cache.store.jdbc.JdbcTypesTransformer.NUMERIC_TYPES;
 
 /**
  * Implementation of {@link CacheStore} backed by JDBC.
@@ -753,17 +752,34 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
                         }
                     }))
                         throw new CacheLoaderException("Provided key type is not found in store or cache configuration " +
-                            "[cache=" + U.maskName(cacheName) + ", key=" + keyType + "]");
-
-                    String qry = args[i + 1].toString();
+                            "[cache=" + U.maskName(cacheName) + ", key=" + keyType + ']');
 
                     EntryMapping em = entryMapping(cacheName, typeIdForTypeName(kindForName(keyType), keyType));
 
-                    if (log.isInfoEnabled())
-                        log.info("Started load cache using custom query [cache=" + U.maskName(cacheName) +
-                            ", keyType=" + keyType + ", query=" + qry + "]");
+                    Object arg = args[i + 1];
+
+                    LoadCacheCustomQueryWorker<K, V> task;
+
+                    if (arg instanceof PreparedStatement) {
+                        PreparedStatement stmt = (PreparedStatement)arg;
+
+                        if (log.isInfoEnabled())
+                            log.info("Started load cache using custom statement [cache=" + U.maskName(cacheName) +
+                                      ", keyType=" + keyType + ", stmt=" + stmt + ']');
+
+                        task = new LoadCacheCustomQueryWorker<>(em, stmt, clo);
+                    }
+                    else {
+                        String qry = arg.toString();
+
+                        if (log.isInfoEnabled())
+                              log.info("Started load cache using custom query [cache=" + U.maskName(cacheName) +
+                                  ", keyType=" + keyType + ", query=" + qry + ']');
+
+                        task = new LoadCacheCustomQueryWorker<>(em, qry, clo);
+                    }
 
-                    futs.add(pool.submit(new LoadCacheCustomQueryWorker<>(em, qry, clo)));
+                    futs.add(pool.submit(task));
                 }
             }
             else {
@@ -778,7 +794,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
                     processedKeyTypes.add(keyType);
 
                     if (log.isInfoEnabled())
-                        log.info("Started load cache [cache=" + U.maskName(cacheName) + ", keyType=" + keyType + "]");
+                        log.info("Started load cache [cache=" + U.maskName(cacheName) + ", keyType=" + keyType + ']');
 
                     if (parallelLoadCacheMinThreshold > 0) {
                         Connection conn = null;
@@ -795,7 +811,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
                             if (rs.next()) {
                                 if (log.isDebugEnabled())
                                     log.debug("Multithread loading entries from db [cache=" + U.maskName(cacheName) +
-                                        ", keyType=" + keyType + "]");
+                                        ", keyType=" + keyType + ']');
 
                                 int keyCnt = em.keyCols.size();
 
@@ -824,7 +840,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
                         }
                         catch (SQLException e) {
                             log.warning("Failed to load entries from db in multithreaded mode, will try in single thread " +
-                                "[cache=" + U.maskName(cacheName) + ", keyType=" + keyType + " ]", e);
+                                "[cache=" + U.maskName(cacheName) + ", keyType=" + keyType + ']', e);
                         }
                         finally {
                             U.closeQuiet(conn);
@@ -833,7 +849,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
 
                     if (log.isDebugEnabled())
                         log.debug("Single thread loading entries from db [cache=" + U.maskName(cacheName) +
-                            ", keyType=" + keyType + "]");
+                            ", keyType=" + keyType + ']');
 
                     futs.add(pool.submit(loadCacheFull(em, clo)));
                 }
@@ -860,7 +876,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
         EntryMapping em = entryMapping(session().cacheName(), typeIdForObject(key));
 
         if (log.isDebugEnabled())
-            log.debug("Load value from db [table= " + em.fullTableName() + ", key=" + key + "]");
+            log.debug("Load value from db [table= " + em.fullTableName() + ", key=" + key + ']');
 
         Connection conn = null;
 
@@ -1954,14 +1970,28 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
         /** Entry mapping description. */
         private final EntryMapping em;
 
+        /** User statement. */
+        private PreparedStatement stmt;
+
         /** User query. */
-        private final String qry;
+        private String qry;
 
         /** Closure for loaded values. */
         private final IgniteBiInClosure<K1, V1> clo;
 
         /**
          * @param em Entry mapping description.
+         * @param stmt User statement.
+         * @param clo Closure for loaded values.
+         */
+        private LoadCacheCustomQueryWorker(EntryMapping em, PreparedStatement stmt, IgniteBiInClosure<K1, V1> clo) {
+            this.em = em;
+            this.stmt = stmt;
+            this.clo = clo;
+        }
+
+        /**
+         * @param em Entry mapping description.
          * @param qry User query.
          * @param clo Closure for loaded values.
          */
@@ -1975,12 +2005,12 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
         @Override public Void call() throws Exception {
             Connection conn = null;
 
-            PreparedStatement stmt = null;
-
             try {
-                conn = openConnection(true);
+                if (stmt == null) {
+                    conn = openConnection(true);
 
-                stmt = conn.prepareStatement(qry);
+                    stmt = conn.prepareStatement(qry);
+                }
 
                 stmt.setFetchSize(dialect.getFetchSize());
 
@@ -2006,9 +2036,11 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
                 throw new CacheLoaderException("Failed to execute custom query for load cache", e);
             }
             finally {
-                U.closeQuiet(stmt);
+                if (conn != null) {
+                    U.closeQuiet(stmt);
 
-                U.closeQuiet(conn);
+                    U.closeQuiet(conn);
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/25c06b50/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreAbstractSelfTest.java
index 1de44f7..9e59769 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreAbstractSelfTest.java
@@ -329,6 +329,55 @@ public abstract class CacheJdbcPojoStoreAbstractSelfTest extends GridCommonAbstr
     }
 
     /**
+     * Checks that data was loaded correctly with prepared statement.
+     */
+    protected void checkCacheLoadWithStatement() throws SQLException {
+        Connection conn = null;
+
+        PreparedStatement stmt = null;
+
+        try {
+            conn = getConnection();
+
+            conn.setAutoCommit(true);
+
+            String qry = "select id, org_id, name, birthday, gender from Person";
+
+            stmt = conn.prepareStatement(qry);
+
+            IgniteCache<Object, Object> c1 = grid().cache(CACHE_NAME);
+
+            c1.loadCache(null, "org.apache.ignite.cache.store.jdbc.model.PersonKey", stmt);
+
+            assertEquals(PERSON_CNT, c1.size());
+        }
+        finally {
+            U.closeQuiet(stmt);
+
+            U.closeQuiet(conn);
+        }
+
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLoadCacheWithStatement() throws Exception {
+        startTestGrid(false, false, false, false, 512);
+
+        checkCacheLoadWithStatement();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLoadCacheWithStatementTx() throws Exception {
+        startTestGrid(false, false, false, true, 512);
+
+        checkCacheLoadWithStatement();
+    }
+
+    /**
      * @throws Exception If failed.
      */
     public void testLoadCache() throws Exception {