You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/02 12:37:57 UTC
[27/41] incubator-ignite git commit: IGNITE-891 - Cache store
improvements
IGNITE-891 - Cache store improvements
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e6cc139e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e6cc139e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e6cc139e
Branch: refs/heads/ignite-218
Commit: e6cc139efa4bb33a334521b6ad0e463ff5b390e8
Parents: 9f88b05
Author: Valentin Kulichenko <vk...@gridgain.com>
Authored: Sun May 24 23:36:21 2015 -0700
Committer: Valentin Kulichenko <vk...@gridgain.com>
Committed: Sun May 24 23:36:21 2015 -0700
----------------------------------------------------------------------
...heStoreSessionListenerLifecycleSelfTest.java | 395 +++++++++++++++++++
1 file changed, 395 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e6cc139e/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerLifecycleSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerLifecycleSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerLifecycleSelfTest.java
new file mode 100644
index 0000000..814c8a5
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerLifecycleSelfTest.java
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lifecycle.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.apache.ignite.transactions.*;
+
+import javax.cache.*;
+import javax.cache.configuration.*;
+import javax.cache.integration.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+
+/**
+ * Store session listeners test.
+ */
+public class CacheStoreSessionListenerLifecycleSelfTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final Queue<String> evts = new ConcurrentLinkedDeque<>();
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setCacheStoreSessionListenerFactories(
+ new SessionListenerFactory("Shared 1"),
+ new SessionListenerFactory("Shared 2")
+ );
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(disco);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ evts.clear();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNoCaches() throws Exception {
+ try {
+ startGrid();
+ }
+ finally {
+ stopGrid();
+ }
+
+ assertEqualsCollections(Arrays.asList("Shared 1 START", "Shared 2 START", "Shared 1 STOP", "Shared 2 STOP"),
+ evts);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNoOverride() throws Exception {
+ try {
+ Ignite ignite = startGrid();
+
+ for (int i = 0; i < 2; i++) {
+ CacheConfiguration<Integer, Integer> cacheCfg = cacheConfiguration("cache-" + i);
+
+ cacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+ ignite.createCache(cacheCfg);
+ }
+
+ ignite.cache("cache-0").put(1, 1);
+ ignite.cache("cache-1").put(1, 1);
+
+ try (Transaction tx = ignite.transactions().txStart()) {
+ ignite.cache("cache-0").put(2, 2);
+ ignite.cache("cache-0").put(3, 3);
+ ignite.cache("cache-1").put(2, 2);
+ ignite.cache("cache-1").put(3, 3);
+
+ tx.commit();
+ }
+ }
+ finally {
+ stopGrid();
+ }
+
+ assertEqualsCollections(Arrays.asList(
+ "Shared 1 START",
+ "Shared 2 START",
+
+ // Put to cache-0.
+ "Shared 1 SESSION START cache-0",
+ "Shared 2 SESSION START cache-0",
+ "Shared 1 SESSION END cache-0",
+ "Shared 2 SESSION END cache-0",
+
+ // Put to cache-1.
+ "Shared 1 SESSION START cache-1",
+ "Shared 2 SESSION START cache-1",
+ "Shared 1 SESSION END cache-1",
+ "Shared 2 SESSION END cache-1",
+
+ // Transaction.
+ "Shared 1 SESSION START cache-0",
+ "Shared 2 SESSION START cache-0",
+ "Shared 1 SESSION START cache-1",
+ "Shared 2 SESSION START cache-1",
+ "Shared 1 SESSION END cache-0",
+ "Shared 2 SESSION END cache-0",
+ "Shared 1 SESSION END cache-1",
+ "Shared 2 SESSION END cache-1",
+
+ "Shared 1 STOP",
+ "Shared 2 STOP"
+ ), evts);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartialOverride() throws Exception {
+ try {
+ Ignite ignite = startGrid();
+
+ for (int i = 0; i < 2; i++) {
+ String name = "cache-" + i;
+
+ CacheConfiguration cacheCfg = cacheConfiguration(name);
+
+ cacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+ if (i == 0) {
+ cacheCfg.setCacheStoreSessionListenerFactories(
+ new SessionListenerFactory(name + " 1"),
+ new SessionListenerFactory(name + " 2")
+ );
+ }
+
+ ignite.createCache(cacheCfg);
+ }
+
+ ignite.cache("cache-0").put(1, 1);
+ ignite.cache("cache-1").put(1, 1);
+
+ try (Transaction tx = ignite.transactions().txStart()) {
+ ignite.cache("cache-0").put(2, 2);
+ ignite.cache("cache-0").put(3, 3);
+ ignite.cache("cache-1").put(2, 2);
+ ignite.cache("cache-1").put(3, 3);
+
+ tx.commit();
+ }
+ }
+ finally {
+ stopGrid();
+ }
+
+ assertEqualsCollections(Arrays.asList(
+ "Shared 1 START",
+ "Shared 2 START",
+ "cache-0 1 START",
+ "cache-0 2 START",
+
+ // Put to cache-0.
+ "cache-0 1 SESSION START cache-0",
+ "cache-0 2 SESSION START cache-0",
+ "cache-0 1 SESSION END cache-0",
+ "cache-0 2 SESSION END cache-0",
+
+ // Put to cache-1.
+ "Shared 1 SESSION START cache-1",
+ "Shared 2 SESSION START cache-1",
+ "Shared 1 SESSION END cache-1",
+ "Shared 2 SESSION END cache-1",
+
+ // Transaction.
+ "cache-0 1 SESSION START cache-0",
+ "cache-0 2 SESSION START cache-0",
+ "Shared 1 SESSION START cache-1",
+ "Shared 2 SESSION START cache-1",
+ "cache-0 1 SESSION END cache-0",
+ "cache-0 2 SESSION END cache-0",
+ "Shared 1 SESSION END cache-1",
+ "Shared 2 SESSION END cache-1",
+
+ "cache-0 1 STOP",
+ "cache-0 2 STOP",
+ "Shared 1 STOP",
+ "Shared 2 STOP"
+ ), evts);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testOverride() throws Exception {
+ try {
+ Ignite ignite = startGrid();
+
+ for (int i = 0; i < 2; i++) {
+ String name = "cache-" + i;
+
+ CacheConfiguration cacheCfg = cacheConfiguration(name);
+
+ cacheCfg.setCacheStoreSessionListenerFactories(new SessionListenerFactory(name + " 1"), new SessionListenerFactory(name + " 2"));
+
+ ignite.createCache(cacheCfg);
+ }
+
+ ignite.cache("cache-0").put(1, 1);
+ ignite.cache("cache-1").put(1, 1);
+
+ try (Transaction tx = ignite.transactions().txStart()) {
+ ignite.cache("cache-0").put(2, 2);
+ ignite.cache("cache-0").put(3, 3);
+ ignite.cache("cache-1").put(2, 2);
+ ignite.cache("cache-1").put(3, 3);
+
+ tx.commit();
+ }
+ }
+ finally {
+ stopGrid();
+ }
+
+ assertEqualsCollections(Arrays.asList(
+ "Shared 1 START",
+ "Shared 2 START",
+ "cache-0 1 START",
+ "cache-0 2 START",
+ "cache-1 1 START",
+ "cache-1 2 START",
+
+ // Put to cache-0.
+ "cache-0 1 SESSION START cache-0",
+ "cache-0 2 SESSION START cache-0",
+ "cache-0 1 SESSION END cache-0",
+ "cache-0 2 SESSION END cache-0",
+
+ // Put to cache-1.
+ "cache-1 1 SESSION START cache-1",
+ "cache-1 2 SESSION START cache-1",
+ "cache-1 1 SESSION END cache-1",
+ "cache-1 2 SESSION END cache-1",
+
+ // Transaction.
+ "cache-0 1 SESSION START cache-0",
+ "cache-0 2 SESSION START cache-0",
+ "cache-1 1 SESSION START cache-1",
+ "cache-1 2 SESSION START cache-1",
+ "cache-0 1 SESSION END cache-0",
+ "cache-0 2 SESSION END cache-0",
+ "cache-1 1 SESSION END cache-1",
+ "cache-1 2 SESSION END cache-1",
+
+ "cache-0 1 STOP",
+ "cache-0 2 STOP",
+ "cache-1 1 STOP",
+ "cache-1 2 STOP",
+ "Shared 1 STOP",
+ "Shared 2 STOP"
+ ), evts);
+ }
+
+ /**
+ * @param name Cache name.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration<Integer, Integer> cacheConfiguration(String name) {
+ CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>(name);
+
+ cacheCfg.setAtomicityMode(TRANSACTIONAL);
+ cacheCfg.setCacheStoreFactory(FactoryBuilder.factoryOf(Store.class));
+ cacheCfg.setWriteThrough(true);
+
+ return cacheCfg;
+ }
+
+ /**
+ */
+ private static class SessionListener implements CacheStoreSessionListener, LifecycleAware {
+ /** */
+ private final String name;
+
+ /** */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /**
+ * @param name Name.
+ */
+ private SessionListener(String name) {
+ this.name = name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteException {
+ assertNotNull(ignite);
+
+ evts.add(name + " START");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() throws IgniteException {
+ assertNotNull(ignite);
+
+ evts.add(name + " STOP");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onSessionStart(CacheStoreSession ses) {
+ assertNotNull(ignite);
+
+ evts.add(name + " SESSION START " + ses.cacheName());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) {
+ assertNotNull(ignite);
+
+ evts.add(name + " SESSION END " + ses.cacheName());
+ }
+ }
+
+ /**
+ */
+ private static class SessionListenerFactory implements Factory<CacheStoreSessionListener> {
+ /** */
+ private String name;
+
+ /**
+ * @param name Name.
+ */
+ private SessionListenerFactory(String name) {
+ this.name = name;
+ }
+
+ @Override public CacheStoreSessionListener create() {
+ return new SessionListener(name);
+ }
+ }
+
+ /**
+ */
+ public static class Store extends CacheStoreAdapter<Integer, Integer> {
+ public Store() {
+ }
+
+ /** {@inheritDoc} */
+ @Override public Integer load(Integer key) throws CacheLoaderException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry)
+ throws CacheWriterException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void delete(Object key) throws CacheWriterException {
+ // No-op.
+ }
+ }
+}