You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/23 12:06:35 UTC

[3/6] ignite git commit: Fixed "IGNITE-4205 CassandraCacheStore should start IgniteThread threads in loadCache() method"

Fixed "IGNITE-4205 CassandraCacheStore should start IgniteThread threads in loadCache() method"

Signed-off-by: nikolay_tikhonov <nt...@gridgain.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/561d2cf0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/561d2cf0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/561d2cf0

Branch: refs/heads/ignite-5075
Commit: 561d2cf048be59524acfbe2ac064d8b633b99c37
Parents: f74d51c
Author: Konstantin Dudkov <kd...@ya.ru>
Authored: Mon May 22 14:30:30 2017 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Mon May 22 14:30:30 2017 +0300

----------------------------------------------------------------------
 .../store/cassandra/CassandraCacheStore.java    | 15 +++-
 .../ignite/tests/IgnitePersistentStoreTest.java | 62 +++++++++++++-
 .../persistence/loadall_blob/ignite-config.xml  | 90 ++++++++++++++++++++
 .../loadall_blob/persistence-settings.xml       | 29 +++++++
 .../store/jdbc/CacheAbstractJdbcStore.java      |  6 +-
 5 files changed, 198 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/561d2cf0/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
index 98c8b40..b438946 100644
--- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
@@ -23,16 +23,17 @@ import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Statement;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.HashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import javax.cache.Cache;
 import javax.cache.integration.CacheLoaderException;
 import javax.cache.integration.CacheWriterException;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.store.CacheStore;
@@ -52,7 +53,9 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.logger.NullLogger;
 import org.apache.ignite.resources.CacheStoreSessionResource;
+import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.thread.IgniteThreadFactory;
 
 /**
  * Implementation of {@link CacheStore} backed by Cassandra database.
@@ -64,6 +67,14 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
     /** Buffer to store mutations performed withing transaction. */
     private static final String TRANSACTION_BUFFER = "CASSANDRA_TRANSACTION_BUFFER";
 
+    /** Thread name. */
+    private static final String CACHE_LOADER_THREAD_NAME = "cassandra-cache-loader";
+
+    /** Auto-injected ignite instance. */
+    @SuppressWarnings("unused")
+    @IgniteInstanceResource
+    private Ignite ignite;
+
     /** Auto-injected store session. */
     @SuppressWarnings("unused")
     @CacheStoreSessionResource
@@ -109,7 +120,7 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
         Collection<Future<?>> futs = new ArrayList<>(args.length);
 
         try {
-            pool = Executors.newFixedThreadPool(maxPoolSize);
+            pool = Executors.newFixedThreadPool(maxPoolSize, new IgniteThreadFactory(ignite.name(), CACHE_LOADER_THREAD_NAME));
 
             CassandraSession ses = getCassandraSession();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/561d2cf0/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java
index c8c7139..feccb24 100644
--- a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java
+++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java
@@ -25,9 +25,11 @@ import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteTransactions;
 import org.apache.ignite.Ignition;
+import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.store.CacheStore;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
 import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.tests.pojos.Person;
@@ -42,9 +44,9 @@ import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
 import org.apache.log4j.Logger;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.junit.Assert;
 import org.springframework.core.io.ClassPathResource;
 
 /**
@@ -247,6 +249,34 @@ public class IgnitePersistentStoreTest {
 
     /** */
     @Test
+    public void blobBinaryLoadCacheTest() {
+        Ignition.stopAll(true);
+
+        try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/loadall_blob/ignite-config.xml")) {
+            IgniteCache<Long, PojoPerson> personCache = ignite.getOrCreateCache("cache2");
+
+            assert ignite.configuration().getMarshaller() instanceof BinaryMarshaller;
+
+            personCache.put(1L, new PojoPerson(1, "name"));
+
+            assert personCache.withKeepBinary().get(1L) instanceof BinaryObject;
+        }
+
+        Ignition.stopAll(true);
+
+        try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/loadall_blob/ignite-config.xml")) {
+            IgniteCache<Long, PojoPerson> personCache = ignite.getOrCreateCache("cache2");
+
+            personCache.loadCache(null, null);
+
+            PojoPerson person = personCache.get(1L);
+
+            LOGGER.info("loadCache tests passed");
+        }
+    }
+
+    /** */
+    @Test
     public void pojoStrategyTest() {
         Ignition.stopAll(true);
 
@@ -673,4 +703,34 @@ public class IgnitePersistentStoreTest {
                 " concurrency and " + isolation + " isolation level");
         LOGGER.info("-----------------------------------------------------------------------------------");
     }
+
+    /** */
+    public static class PojoPerson {
+        /** */
+        private int id;
+
+        /** */
+        private String name;
+
+        /** */
+        public PojoPerson() {
+            // No-op.
+        }
+
+        /** */
+        public PojoPerson(int id, String name) {
+            this.id = id;
+            this.name = name;
+        }
+
+        /** */
+        public int getId() {
+            return id;
+        }
+
+        /** */
+        public String getName() {
+            return name;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/561d2cf0/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/ignite-config.xml
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/ignite-config.xml b/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/ignite-config.xml
new file mode 100644
index 0000000..115e263
--- /dev/null
+++ b/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/ignite-config.xml
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xsi:schemaLocation="
+        http://www.springframework.org/schema/beans
+        http://www.springframework.org/schema/beans/spring-beans.xsd">
+
+    <!-- Cassandra connection settings -->
+    <import resource="classpath:org/apache/ignite/tests/cassandra/connection-settings.xml"/>
+
+    <!-- Persistence settings for 'cache2' -->
+    <bean id="cache2_persistence_settings"
+          class="org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings">
+        <constructor-arg type="org.springframework.core.io.Resource"
+                         value="classpath:org/apache/ignite/tests/persistence/loadall_blob/persistence-settings.xml"/>
+    </bean>
+
+    <!-- Ignite configuration -->
+    <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+
+        <property name="marshaller">
+            <bean class="org.apache.ignite.internal.binary.BinaryMarshaller"/>
+        </property>
+
+        <property name="binaryConfiguration">
+            <bean class="org.apache.ignite.configuration.BinaryConfiguration">
+                <property name="compactFooter" value="false"/>
+            </bean>
+        </property>
+
+        <property name="cacheConfiguration">
+            <list>
+                <!-- Configuring persistence for "cache2" cache -->
+                <bean class="org.apache.ignite.configuration.CacheConfiguration">
+                    <property name="name" value="cache2"/>
+                    <property name="readThrough" value="true"/>
+                    <property name="writeThrough" value="true"/>
+                    <property name="storeKeepBinary" value="true"/>
+                    <property name="cacheStoreFactory">
+                        <bean class="org.apache.ignite.cache.store.cassandra.CassandraCacheStoreFactory">
+                            <property name="dataSourceBean" value="cassandraAdminDataSource"/>
+                            <property name="persistenceSettingsBean" value="cache2_persistence_settings"/>
+                        </bean>
+                    </property>
+                </bean>
+            </list>
+        </property>
+
+        <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
+        <property name="discoverySpi">
+            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
+                <property name="ipFinder">
+                    <!--
+                        Ignite provides several options for automatic discovery that can be used
+                        instead os static IP based discovery. For information on all options refer
+                        to our documentation: http://apacheignite.readme.io/docs/cluster-config
+                    -->
+                    <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
+                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
+                        <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">-->
+                        <property name="addresses">
+                            <list>
+                                <!-- In distributed environment, replace with actual host IP address. -->
+                                <value>127.0.0.1:47500..47509</value>
+                            </list>
+                        </property>
+                    </bean>
+                </property>
+            </bean>
+        </property>
+    </bean>
+</beans>

http://git-wip-us.apache.org/repos/asf/ignite/blob/561d2cf0/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/persistence-settings.xml
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/persistence-settings.xml b/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/persistence-settings.xml
new file mode 100644
index 0000000..e872201
--- /dev/null
+++ b/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/persistence-settings.xml
@@ -0,0 +1,29 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<persistence keyspace="test1" table="blob_test3">
+    <!-- By default Java standard serialization is going to be used -->
+    <keyPersistence class="java.lang.Long"
+                    strategy="BLOB"
+                    column="key"/>
+
+    <!-- Kryo serialization specified to be used -->
+    <valuePersistence class="org.apache.ignite.tests.pojos.Person"
+                      strategy="BLOB"
+                      serializer="org.apache.ignite.cache.store.cassandra.serializer.JavaSerializer"
+                      column="value"/>
+</persistence>

http://git-wip-us.apache.org/repos/asf/ignite/blob/561d2cf0/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
index 46e9022..b1ec38d 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
@@ -69,6 +69,7 @@ import org.apache.ignite.lifecycle.LifecycleAware;
 import org.apache.ignite.resources.CacheStoreSessionResource;
 import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.thread.IgniteThreadFactory;
 import org.apache.ignite.transactions.Transaction;
 import org.jetbrains.annotations.Nullable;
 
@@ -121,6 +122,9 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
     /** Connection attribute property name. */
     protected static final String ATTR_CONN_PROP = "JDBC_STORE_CONNECTION";
 
+    /** Thread name. */
+    private static final String CACHE_LOADER_THREAD_NAME = "jdbc-cache-loader";
+
     /** Built in Java types names. */
     protected static final Collection<String> BUILT_IN_TYPES = new HashSet<>();
 
@@ -680,7 +684,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
         String cacheName = session().cacheName();
 
         try {
-            pool = Executors.newFixedThreadPool(maxPoolSize);
+            pool = Executors.newFixedThreadPool(maxPoolSize, new IgniteThreadFactory(ignite.name(), CACHE_LOADER_THREAD_NAME));
 
             Collection<Future<?>> futs = new ArrayList<>();