You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/23 12:06:35 UTC
[3/6] ignite git commit: Fixed "IGNITE-4205 CassandraCacheStore
should start IgniteThread threads in loadCache() method"
Fixed "IGNITE-4205 CassandraCacheStore should start IgniteThread threads in loadCache() method"
Signed-off-by: nikolay_tikhonov <nt...@gridgain.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/561d2cf0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/561d2cf0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/561d2cf0
Branch: refs/heads/ignite-5075
Commit: 561d2cf048be59524acfbe2ac064d8b633b99c37
Parents: f74d51c
Author: Konstantin Dudkov <kd...@ya.ru>
Authored: Mon May 22 14:30:30 2017 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Mon May 22 14:30:30 2017 +0300
----------------------------------------------------------------------
.../store/cassandra/CassandraCacheStore.java | 15 +++-
.../ignite/tests/IgnitePersistentStoreTest.java | 62 +++++++++++++-
.../persistence/loadall_blob/ignite-config.xml | 90 ++++++++++++++++++++
.../loadall_blob/persistence-settings.xml | 29 +++++++
.../store/jdbc/CacheAbstractJdbcStore.java | 6 +-
5 files changed, 198 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/561d2cf0/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
index 98c8b40..b438946 100644
--- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
@@ -23,16 +23,17 @@ import com.datastax.driver.core.Row;
import com.datastax.driver.core.Statement;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javax.cache.Cache;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
+import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.store.CacheStore;
@@ -52,7 +53,9 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.logger.NullLogger;
import org.apache.ignite.resources.CacheStoreSessionResource;
+import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.thread.IgniteThreadFactory;
/**
* Implementation of {@link CacheStore} backed by Cassandra database.
@@ -64,6 +67,14 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
/** Buffer to store mutations performed withing transaction. */
private static final String TRANSACTION_BUFFER = "CASSANDRA_TRANSACTION_BUFFER";
+ /** Thread name. */
+ private static final String CACHE_LOADER_THREAD_NAME = "cassandra-cache-loader";
+
+ /** Auto-injected ignite instance. */
+ @SuppressWarnings("unused")
+ @IgniteInstanceResource
+ private Ignite ignite;
+
/** Auto-injected store session. */
@SuppressWarnings("unused")
@CacheStoreSessionResource
@@ -109,7 +120,7 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
Collection<Future<?>> futs = new ArrayList<>(args.length);
try {
- pool = Executors.newFixedThreadPool(maxPoolSize);
+ pool = Executors.newFixedThreadPool(maxPoolSize, new IgniteThreadFactory(ignite.name(), CACHE_LOADER_THREAD_NAME));
CassandraSession ses = getCassandraSession();
http://git-wip-us.apache.org/repos/asf/ignite/blob/561d2cf0/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java
index c8c7139..feccb24 100644
--- a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java
+++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java
@@ -25,9 +25,11 @@ import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteTransactions;
import org.apache.ignite.Ignition;
+import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.tests.pojos.Person;
@@ -42,9 +44,9 @@ import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.log4j.Logger;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.junit.Assert;
import org.springframework.core.io.ClassPathResource;
/**
@@ -247,6 +249,34 @@ public class IgnitePersistentStoreTest {
/** */
@Test
+ public void blobBinaryLoadCacheTest() {
+ Ignition.stopAll(true);
+
+ try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/loadall_blob/ignite-config.xml")) {
+ IgniteCache<Long, PojoPerson> personCache = ignite.getOrCreateCache("cache2");
+
+ assert ignite.configuration().getMarshaller() instanceof BinaryMarshaller;
+
+ personCache.put(1L, new PojoPerson(1, "name"));
+
+ assert personCache.withKeepBinary().get(1L) instanceof BinaryObject;
+ }
+
+ Ignition.stopAll(true);
+
+ try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/loadall_blob/ignite-config.xml")) {
+ IgniteCache<Long, PojoPerson> personCache = ignite.getOrCreateCache("cache2");
+
+ personCache.loadCache(null, null);
+
+ PojoPerson person = personCache.get(1L);
+
+ LOGGER.info("loadCache tests passed");
+ }
+ }
+
+ /** */
+ @Test
public void pojoStrategyTest() {
Ignition.stopAll(true);
@@ -673,4 +703,34 @@ public class IgnitePersistentStoreTest {
" concurrency and " + isolation + " isolation level");
LOGGER.info("-----------------------------------------------------------------------------------");
}
+
+ /** */
+ public static class PojoPerson {
+ /** */
+ private int id;
+
+ /** */
+ private String name;
+
+ /** */
+ public PojoPerson() {
+ // No-op.
+ }
+
+ /** */
+ public PojoPerson(int id, String name) {
+ this.id = id;
+ this.name = name;
+ }
+
+ /** */
+ public int getId() {
+ return id;
+ }
+
+ /** */
+ public String getName() {
+ return name;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/561d2cf0/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/ignite-config.xml
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/ignite-config.xml b/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/ignite-config.xml
new file mode 100644
index 0000000..115e263
--- /dev/null
+++ b/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/ignite-config.xml
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd">
+
+ <!-- Cassandra connection settings -->
+ <import resource="classpath:org/apache/ignite/tests/cassandra/connection-settings.xml"/>
+
+ <!-- Persistence settings for 'cache2' -->
+ <bean id="cache2_persistence_settings"
+ class="org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings">
+ <constructor-arg type="org.springframework.core.io.Resource"
+ value="classpath:org/apache/ignite/tests/persistence/loadall_blob/persistence-settings.xml"/>
+ </bean>
+
+ <!-- Ignite configuration -->
+ <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+
+ <property name="marshaller">
+ <bean class="org.apache.ignite.internal.binary.BinaryMarshaller"/>
+ </property>
+
+ <property name="binaryConfiguration">
+ <bean class="org.apache.ignite.configuration.BinaryConfiguration">
+ <property name="compactFooter" value="false"/>
+ </bean>
+ </property>
+
+ <property name="cacheConfiguration">
+ <list>
+ <!-- Configuring persistence for "cache2" cache -->
+ <bean class="org.apache.ignite.configuration.CacheConfiguration">
+ <property name="name" value="cache2"/>
+ <property name="readThrough" value="true"/>
+ <property name="writeThrough" value="true"/>
+ <property name="storeKeepBinary" value="true"/>
+ <property name="cacheStoreFactory">
+ <bean class="org.apache.ignite.cache.store.cassandra.CassandraCacheStoreFactory">
+ <property name="dataSourceBean" value="cassandraAdminDataSource"/>
+ <property name="persistenceSettingsBean" value="cache2_persistence_settings"/>
+ </bean>
+ </property>
+ </bean>
+ </list>
+ </property>
+
+ <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
+ <property name="discoverySpi">
+ <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
+ <property name="ipFinder">
+ <!--
+ Ignite provides several options for automatic discovery that can be used
+ instead os static IP based discovery. For information on all options refer
+ to our documentation: http://apacheignite.readme.io/docs/cluster-config
+ -->
+ <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
+ <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
+ <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">-->
+ <property name="addresses">
+ <list>
+ <!-- In distributed environment, replace with actual host IP address. -->
+ <value>127.0.0.1:47500..47509</value>
+ </list>
+ </property>
+ </bean>
+ </property>
+ </bean>
+ </property>
+ </bean>
+</beans>
http://git-wip-us.apache.org/repos/asf/ignite/blob/561d2cf0/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/persistence-settings.xml
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/persistence-settings.xml b/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/persistence-settings.xml
new file mode 100644
index 0000000..e872201
--- /dev/null
+++ b/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/persistence-settings.xml
@@ -0,0 +1,29 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<persistence keyspace="test1" table="blob_test3">
+ <!-- By default Java standard serialization is going to be used -->
+ <keyPersistence class="java.lang.Long"
+ strategy="BLOB"
+ column="key"/>
+
+ <!-- Kryo serialization specified to be used -->
+ <valuePersistence class="org.apache.ignite.tests.pojos.Person"
+ strategy="BLOB"
+ serializer="org.apache.ignite.cache.store.cassandra.serializer.JavaSerializer"
+ column="value"/>
+</persistence>
http://git-wip-us.apache.org/repos/asf/ignite/blob/561d2cf0/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
index 46e9022..b1ec38d 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
@@ -69,6 +69,7 @@ import org.apache.ignite.lifecycle.LifecycleAware;
import org.apache.ignite.resources.CacheStoreSessionResource;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.thread.IgniteThreadFactory;
import org.apache.ignite.transactions.Transaction;
import org.jetbrains.annotations.Nullable;
@@ -121,6 +122,9 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
/** Connection attribute property name. */
protected static final String ATTR_CONN_PROP = "JDBC_STORE_CONNECTION";
+ /** Thread name. */
+ private static final String CACHE_LOADER_THREAD_NAME = "jdbc-cache-loader";
+
/** Built in Java types names. */
protected static final Collection<String> BUILT_IN_TYPES = new HashSet<>();
@@ -680,7 +684,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
String cacheName = session().cacheName();
try {
- pool = Executors.newFixedThreadPool(maxPoolSize);
+ pool = Executors.newFixedThreadPool(maxPoolSize, new IgniteThreadFactory(ignite.name(), CACHE_LOADER_THREAD_NAME));
Collection<Future<?>> futs = new ArrayList<>();