You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/11/09 15:11:30 UTC
ignite git commit: ignite-6669 Do not call CacheStoreSessionListener
if store operation is not executed
Repository: ignite
Updated Branches:
refs/heads/master c939bdba3 -> b8672d7d6
ignite-6669 Do not call CacheStoreSessionListener if store operation is not executed
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b8672d7d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b8672d7d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b8672d7d
Branch: refs/heads/master
Commit: b8672d7d691981be3c10f74e97ae2caa5ddd1593
Parents: c939bdb
Author: Slava Koptilin <sl...@gmail.com>
Authored: Thu Nov 9 18:10:31 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Nov 9 18:10:31 2017 +0300
----------------------------------------------------------------------
.../store/GridCacheStoreManagerAdapter.java | 94 +++++-
...oreListenerRWThroughDisabledAtomicCache.java | 33 ++
...enerRWThroughDisabledTransactionalCache.java | 138 +++++++++
...SessionListenerReadWriteThroughDisabled.java | 291 ++++++++++++++++++
...eStoreSessionListenerWriteBehindEnabled.java | 304 +++++++++++++++++++
.../testsuites/IgniteCacheTestSuite4.java | 8 +-
.../Cache/Store/CacheStoreSessionTest.cs | 13 +-
7 files changed, 856 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b8672d7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
index 9fe1f0c..22c2381 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
@@ -106,6 +106,9 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
private boolean writeThrough;
/** */
+ private boolean readThrough;
+
+ /** */
private Collection<CacheStoreSessionListener> sesLsnrs;
/** */
@@ -122,6 +125,8 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
writeThrough = cfg.isWriteThrough();
+ readThrough = cfg.isReadThrough();
+
this.cfgStore = cfgStore;
store = cacheStoreWrapper(ctx, cfgStore, cfg);
@@ -306,7 +311,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
log.debug(S.toString("Loading value from store for key",
"key", storeKey, true));
- sessionInit0(tx);
+ sessionInit0(tx, StoreOperation.READ, false);
boolean threwEx = true;
@@ -442,7 +447,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
if (log.isDebugEnabled())
log.debug("Loading values from store for keys: " + keys0);
- sessionInit0(tx);
+ sessionInit0(tx, StoreOperation.READ, false);
boolean threwEx = true;
@@ -501,7 +506,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
if (log.isDebugEnabled())
log.debug("Loading all values from store.");
- sessionInit0(null);
+ sessionInit0(null, StoreOperation.READ, false);
boolean threwEx = true;
@@ -567,7 +572,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
"val", val0, true));
}
- sessionInit0(tx);
+ sessionInit0(tx, StoreOperation.WRITE, false);
boolean threwEx = true;
@@ -622,7 +627,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
if (log.isDebugEnabled())
log.debug("Storing values in cache store [entries=" + entries + ']');
- sessionInit0(tx);
+ sessionInit0(tx, StoreOperation.WRITE, false);
boolean threwEx = true;
@@ -675,7 +680,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
if (log.isDebugEnabled())
log.debug(S.toString("Removing value from cache store", "key", key0, true));
- sessionInit0(tx);
+ sessionInit0(tx, StoreOperation.WRITE, false);
boolean threwEx = true;
@@ -727,7 +732,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
log.debug(S.toString("Removing values from cache store",
"keys", keys0, true));
- sessionInit0(tx);
+ sessionInit0(tx, StoreOperation.WRITE, false);
boolean threwEx = true;
@@ -778,10 +783,10 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
boolean storeSessionEnded) throws IgniteCheckedException {
assert store != null;
- sessionInit0(tx);
+ sessionInit0(tx, commit? StoreOperation.COMMIT: StoreOperation.ROLLBACK, false);
try {
- if (sesLsnrs != null) {
+ if (sesLsnrs != null && sesHolder.get().contains(store)) {
for (CacheStoreSessionListener lsnr : sesLsnrs)
lsnr.onSessionEnd(locSes, commit);
}
@@ -820,7 +825,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
/** {@inheritDoc} */
@Override public void writeBehindSessionInit() throws IgniteCheckedException {
- sessionInit0(null);
+ sessionInit0(null, null, true);
}
/** {@inheritDoc} */
@@ -830,9 +835,12 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
/**
* @param tx Current transaction.
+ * @param op Store operation.
+ * @param writeBehindStoreInitiator {@code true} if method call is initiated by {@link GridCacheWriteBehindStore}.
* @throws IgniteCheckedException If failed.
*/
- private void sessionInit0(@Nullable IgniteInternalTx tx) throws IgniteCheckedException {
+ private void sessionInit0(@Nullable IgniteInternalTx tx, @Nullable StoreOperation op,
+ boolean writeBehindStoreInitiator) throws IgniteCheckedException {
assert sesHolder != null;
SessionData ses;
@@ -854,8 +862,45 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
sesHolder.set(ses);
+ notifyCacheStoreSessionListeners(ses, op, writeBehindStoreInitiator);
+ }
+
+ /**
+ * @param ses Current session.
+ * @param op Store operation.
+ * @param writeBehindStoreInitiator {@code True} if method call is initiated by {@link GridCacheWriteBehindStore}.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void notifyCacheStoreSessionListeners(SessionData ses, @Nullable StoreOperation op,
+ boolean writeBehindStoreInitiator) throws IgniteCheckedException {
try {
- if (!ses.started(store) && sesLsnrs != null) {
+ boolean notifyLsnrs = false;
+
+ if (writeBehindStoreInitiator)
+ notifyLsnrs = !ses.started(store) && sesLsnrs != null;
+ else {
+ assert op != null;
+
+ switch (op) {
+ case READ:
+ notifyLsnrs = readThrough && !ses.started(store) && sesLsnrs != null;
+ break;
+
+ case WRITE:
+ notifyLsnrs = !cacheConfiguration().isWriteBehindEnabled() && writeThrough
+ && !ses.started(store) && sesLsnrs != null;
+ break;
+
+ case COMMIT:
+ case ROLLBACK:
+ // No needs to start the session (if not started yet) and notify listeners.
+ break;
+
+ default:
+ assert false : "Unexpected operation: " + op.toString();
+ }
+ }
+ if (notifyLsnrs) {
for (CacheStoreSessionListener lsnr : sesLsnrs)
lsnr.onSessionStart(locSes);
}
@@ -871,7 +916,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
private void sessionEnd0(@Nullable IgniteInternalTx tx, boolean threwEx) throws IgniteCheckedException {
try {
if (tx == null) {
- if (sesLsnrs != null) {
+ if (sesLsnrs != null && sesHolder.get().contains(store)) {
for (CacheStoreSessionListener lsnr : sesLsnrs)
lsnr.onSessionEnd(locSes, !threwEx);
}
@@ -995,6 +1040,14 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
return !started.remove(store);
}
+ /**
+ * @param store Cache store.
+ * @return {@code True} if session started.
+ */
+ private boolean contains(CacheStore store) {
+ return started.contains(store);
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(SessionData.class, this, "tx", CU.txString(tx != null ? tx.tx : null));
@@ -1429,4 +1482,19 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
throw new UnsupportedOperationException();
}
}
+
+ /** Enumeration that represents possible operations on the underlying store. */
+ private enum StoreOperation {
+ /** Read key-value pair from the underlying store. */
+ READ,
+
+ /** Update or remove key from the underlying store. */
+ WRITE,
+
+ /** Commit changes to the underlying store. */
+ COMMIT,
+
+ /** Rollback changes to the underlying store. */
+ ROLLBACK
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b8672d7d/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreListenerRWThroughDisabledAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreListenerRWThroughDisabledAtomicCache.java b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreListenerRWThroughDisabledAtomicCache.java
new file mode 100644
index 0000000..9b59940
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreListenerRWThroughDisabledAtomicCache.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+
+/**
+ * This class tests that redundant calls of {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)}
+ * and {@link CacheStoreSessionListener#onSessionEnd(CacheStoreSession, boolean)} are not executed.
+ */
+public class CacheStoreListenerRWThroughDisabledAtomicCache extends CacheStoreSessionListenerReadWriteThroughDisabled {
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return ATOMIC;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b8672d7d/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreListenerRWThroughDisabledTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreListenerRWThroughDisabledTransactionalCache.java b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreListenerRWThroughDisabledTransactionalCache.java
new file mode 100644
index 0000000..6502c97
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreListenerRWThroughDisabledTransactionalCache.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store;
+
+import java.util.Random;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ * This class tests that redundant calls of {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)}
+ * and {@link CacheStoreSessionListener#onSessionEnd(CacheStoreSession, boolean)} are not executed.
+ */
+public class CacheStoreListenerRWThroughDisabledTransactionalCache extends CacheStoreSessionListenerReadWriteThroughDisabled {
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return TRANSACTIONAL;
+ }
+
+ /**
+ * Tests {@link IgniteCache#get(Object)} with disabled read-through and write-through modes.
+ */
+ public void testTransactionalLookup() {
+ testTransactionalLookup(OPTIMISTIC, READ_COMMITTED);
+ testTransactionalLookup(OPTIMISTIC, REPEATABLE_READ);
+ testTransactionalLookup(OPTIMISTIC, SERIALIZABLE);
+
+ testTransactionalLookup(PESSIMISTIC, READ_COMMITTED);
+ testTransactionalLookup(PESSIMISTIC, REPEATABLE_READ);
+ testTransactionalLookup(PESSIMISTIC, SERIALIZABLE);
+ }
+
+ /**
+ * @param concurrency Transaction concurrency level.
+ * @param isolation Transaction isolation level.
+ */
+ private void testTransactionalLookup(TransactionConcurrency concurrency, TransactionIsolation isolation) {
+ IgniteCache cache = grid(0).getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ Random r = new Random();
+
+ try (Transaction tx = grid(0).transactions().txStart(concurrency, isolation)) {
+ for (int i = 0; i < CNT; ++i)
+ cache.get(r.nextInt());
+
+ tx.commit();
+ }
+ }
+
+ /**
+ * Tests {@link IgniteCache#put(Object, Object)} with disabled read-through and write-through modes.
+ */
+ public void testTransactionalUpdate() {
+ testTransactionalUpdate(OPTIMISTIC, READ_COMMITTED);
+ testTransactionalUpdate(OPTIMISTIC, REPEATABLE_READ);
+ testTransactionalUpdate(OPTIMISTIC, SERIALIZABLE);
+
+ testTransactionalUpdate(PESSIMISTIC, READ_COMMITTED);
+ testTransactionalUpdate(PESSIMISTIC, REPEATABLE_READ);
+ testTransactionalUpdate(PESSIMISTIC, SERIALIZABLE);
+ }
+
+ /**
+ * @param concurrency Transaction concurrency level.
+ * @param isolation Transaction isolation level.
+ */
+ private void testTransactionalUpdate(TransactionConcurrency concurrency, TransactionIsolation isolation) {
+ IgniteCache cache = grid(0).getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ Random r = new Random();
+
+ try (Transaction tx = grid(0).transactions().txStart(concurrency, isolation)) {
+ for (int i = 0; i < CNT; ++i)
+ cache.put(r.nextInt(), "test-value");
+
+ tx.commit();
+ }
+ }
+
+ /**
+ * Tests {@link IgniteCache#remove(Object)} with disabled read-through and write-through modes.
+ */
+ public void testTransactionalRemove() {
+ testTransactionalRemove(OPTIMISTIC, READ_COMMITTED);
+ testTransactionalRemove(OPTIMISTIC, REPEATABLE_READ);
+ testTransactionalRemove(OPTIMISTIC, SERIALIZABLE);
+
+ testTransactionalRemove(PESSIMISTIC, READ_COMMITTED);
+ testTransactionalRemove(PESSIMISTIC, REPEATABLE_READ);
+ testTransactionalRemove(PESSIMISTIC, SERIALIZABLE);
+ }
+
+ /**
+ * @param concurrency Transaction concurrency level.
+ * @param isolation Transaction isolation level.
+ */
+ private void testTransactionalRemove(TransactionConcurrency concurrency, TransactionIsolation isolation) {
+ IgniteCache cache = grid(0).getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ Random r = new Random();
+
+ try (Transaction tx = grid(0).transactions().txStart(concurrency, isolation)) {
+ for (int i = 0; i < CNT; ++i) {
+ int key = r.nextInt();
+
+ cache.put(key, "test-value");
+
+ cache.remove(key, "test-value");
+ }
+
+ tx.commit();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b8672d7d/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerReadWriteThroughDisabled.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerReadWriteThroughDisabled.java b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerReadWriteThroughDisabled.java
new file mode 100644
index 0000000..1f6e97d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerReadWriteThroughDisabled.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store;
+
+import java.io.PrintWriter;
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.logging.Logger;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import javax.sql.DataSource;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.store.jdbc.CacheJdbcStoreSessionListener;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.processors.cache.GridCacheAbstractSelfTest;
+
+/**
+ * This class tests that redundant calls of {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)}
+ * and {@link CacheStoreSessionListener#onSessionEnd(CacheStoreSession, boolean)} are not executed.
+ */
+public abstract class CacheStoreSessionListenerReadWriteThroughDisabled extends GridCacheAbstractSelfTest {
+ /** {@inheritDoc} */
+ protected int gridCount() {
+ return 2;
+ }
+
+ /** */
+ protected final int CNT = 100;
+
+ /** {@inheritDoc} */
+ protected CacheConfiguration cacheConfiguration(String igniteInstanceName) throws Exception {
+ CacheConfiguration cacheCfg = super.cacheConfiguration(igniteInstanceName);
+
+ cacheCfg.setCacheStoreFactory(FactoryBuilder.factoryOf(EmptyCacheStore.class));
+
+ cacheCfg.setCacheStoreSessionListenerFactories(new CacheStoreSessionFactory());
+
+ cacheCfg.setReadThrough(false);
+ cacheCfg.setWriteThrough(false);
+
+ cacheCfg.setBackups(0);
+
+ return cacheCfg;
+ }
+
+ /** {@inheritDoc} */
+ protected NearCacheConfiguration nearConfiguration() {
+ return null;
+ }
+
+ /**
+ * Tests that there are no calls of {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)} and
+ * {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)}
+ * while {@link IgniteCache#get(Object)} performed.
+ *
+ * @throws Exception If failed.
+ */
+ public void testLookup() throws Exception {
+ IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ Random r = new Random();
+
+ for (int i = 0; i < CNT; ++i)
+ cache.get(r.nextInt());
+ }
+
+ /**
+ * Tests that there are no calls of {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)} and
+ * {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)}
+ * while {@link IgniteCache#getAll(Set)} performed.
+ *
+ * @throws Exception If failed.
+ */
+ public void testBatchLookup() throws Exception {
+ IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ Random r = new Random();
+
+ Set<Object> values = new HashSet<>();
+
+ for (int i = 0; i < CNT; ++i)
+ values.add(r.nextInt());
+
+ cache.getAll(values);
+ }
+
+ /**
+ * Tests that there are no calls of {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)} and
+ * {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)}
+ * while {@link IgniteCache#put(Object, Object)} performed.
+ *
+ * @throws Exception If failed.
+ */
+ public void testUpdate() throws Exception {
+ IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ Random r = new Random();
+
+ for (int i = 0; i < CNT; ++i)
+ cache.put(r.nextInt(), "test-value");
+ }
+
+ /**
+ * Tests that there are no calls of {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)} and
+ * {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)}
+ * while {@link IgniteCache#putAll(Map)} performed.
+ *
+ * @throws Exception If failed.
+ */
+ public void testBatchUpdate() throws Exception {
+ IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ Random r = new Random();
+
+ Map<Object, Object> values = new TreeMap<>();
+
+ for (int i = 0; i < CNT; ++i)
+ values.put(r.nextInt(), "test-value");
+
+ cache.putAll(values);
+ }
+
+ /**
+ * Tests that there are no calls of {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)} and
+ * {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)}
+ * while {@link IgniteCache#remove(Object)} performed.
+ *
+ * @throws Exception If failed.
+ */
+ public void testRemove() throws Exception {
+ IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ Random r = new Random();
+
+ for (int i = 0; i < CNT; ++i) {
+ int key = r.nextInt();
+
+ cache.put(key, "test-value");
+
+ cache.remove(key);
+ }
+ }
+
+ /**
+ * Tests that there are no calls of {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)} and
+ * {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)}
+ * while {@link IgniteCache#removeAll(Set)} performed.
+ *
+ * @throws Exception If failed.
+ */
+ public void testBatchRemove() throws Exception {
+ IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ Random r = new Random();
+
+ Set<Object> values = new HashSet<>();
+
+ for (int i = 0; i < CNT; ++i) {
+ int key = r.nextInt();
+
+ cache.put(key, "test-value");
+
+ values.add(key);
+ }
+
+ cache.removeAll(values);
+ }
+
+ /**
+ * Cache store session factory.
+ */
+ public static class CacheStoreSessionFactory implements Factory<TestCacheStoreSessionListener> {
+ /** {@inheritDoc} */
+ @Override public TestCacheStoreSessionListener create() {
+ TestCacheStoreSessionListener lsnr = new TestCacheStoreSessionListener();
+ lsnr.setDataSource(new DataSourceStub());
+ return lsnr;
+ }
+ }
+
+ /**
+ * Test cache store session listener.
+ */
+ public static class TestCacheStoreSessionListener extends CacheJdbcStoreSessionListener {
+ /** {@inheritDoc} */
+ @Override public void onSessionStart(CacheStoreSession ses) {
+ fail("TestCacheStoreSessionListener.onSessionStart(CacheStoreSession) should not be called.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) {
+ fail("TestCacheStoreSessionListener.onSessionEnd(CacheStoreSession, boolean) should not be called.");
+ }
+ }
+
+ /** Empty cache store implementation. All overridden methods should not be called while the test is running. */
+ public static class EmptyCacheStore extends CacheStoreAdapter {
+ /** {@inheritDoc} */
+ @Override public Object load(Object key) throws CacheLoaderException {
+ fail("EmptyCacheStore.load(Object) should not be called.");
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(Cache.Entry entry) throws CacheWriterException {
+ fail("EmptyCacheStore.write(Cache.Entry) should not be called.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void delete(Object key) throws CacheWriterException {
+ fail("EmptyCacheStore.delete(Object) should not be called.");
+ }
+ }
+
+ /**
+ * Data source stub which should not be called.
+ */
+ public static class DataSourceStub implements DataSource, Serializable {
+ /** {@inheritDoc} */
+ @Override public Connection getConnection() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Connection getConnection(String username, String password) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> T unwrap(Class<T> iface) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isWrapperFor(Class<?> iface) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public PrintWriter getLogWriter() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setLogWriter(PrintWriter out) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setLoginTimeout(int seconds) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getLoginTimeout() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Logger getParentLogger() throws SQLFeatureNotSupportedException {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b8672d7d/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerWriteBehindEnabled.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerWriteBehindEnabled.java b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerWriteBehindEnabled.java
new file mode 100644
index 0000000..fbb881e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerWriteBehindEnabled.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store;
+
+import java.io.PrintWriter;
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Logger;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import javax.sql.DataSource;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.store.jdbc.CacheJdbcStoreSessionListener;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.cache.GridCacheAbstractSelfTest;
+import org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore;
+import org.apache.ignite.resources.IgniteInstanceResource;
+
+/**
+ * This class tests that calls of {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)}
+ * and {@link CacheStoreSessionListener#onSessionEnd(CacheStoreSession, boolean)} are executed from
+ * {@link GridCacheWriteBehindStore} only.
+ */
+public class CacheStoreSessionListenerWriteBehindEnabled extends GridCacheAbstractSelfTest {
+ /** */
+ protected final static int CNT = 100;
+
+ /** */
+ private final static int WRITE_BEHIND_FLUSH_FREQUENCY = 1000;
+
+ /** */
+ private static final List<OperationType> operations = Collections.synchronizedList(new ArrayList<OperationType>());
+
+ /** */
+ private static final AtomicInteger entryCnt = new AtomicInteger();
+
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheConfiguration cacheConfiguration(String igniteInstanceName) throws Exception {
+ CacheConfiguration cacheCfg = super.cacheConfiguration(igniteInstanceName);
+
+ cacheCfg.setCacheStoreFactory(FactoryBuilder.factoryOf(EmptyCacheStore.class));
+
+ cacheCfg.setCacheStoreSessionListenerFactories(new CacheStoreSessionFactory());
+
+ cacheCfg.setReadThrough(true);
+ cacheCfg.setWriteThrough(true);
+
+ cacheCfg.setWriteBehindEnabled(true);
+ cacheCfg.setWriteBehindBatchSize(CNT * 2);
+ cacheCfg.setWriteBehindFlushFrequency(WRITE_BEHIND_FLUSH_FREQUENCY);
+
+ cacheCfg.setBackups(0);
+
+ return cacheCfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ operations.clear();
+
+ entryCnt.set(0);
+ }
+
+ /**
+ * Tests that there are no redundant calls of {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)}
+ * while {@link IgniteCache#get(Object)} performed.
+ */
+ public void testLookup() {
+ IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ for (int i = 0; i < CNT; ++i)
+ cache.get(i);
+
+ checkSessionCounters(CNT);
+ }
+
+ /**
+ * Tests that there are no redundant calls of {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)}
+ * while {@link IgniteCache#put(Object, Object)} performed.
+ */
+ public void testUpdate() {
+ IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ for (int i = 0; i < CNT; ++i)
+ cache.put(i, i);
+
+ checkSessionCounters(1);
+ }
+
+ /**
+ * Tests that there are no redundant calls of {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)}
+ * while {@link IgniteCache#remove(Object)} performed.
+ */
+ public void testRemove() {
+ IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ for (int i = 0; i < CNT; ++i) {
+ cache.remove(i);
+ }
+
+ checkSessionCounters(1);
+ }
+
+ /**
+ * @param startedSessions Number of expected sessions.
+ */
+ private void checkSessionCounters(int startedSessions) {
+ try {
+ // Wait for GridCacheWriteBehindStore
+ Thread.sleep(WRITE_BEHIND_FLUSH_FREQUENCY * 4);
+
+ assertEquals(CNT, entryCnt.get());
+
+ checkOpCount(operations, OperationType.SESSION_START, startedSessions);
+
+ checkOpCount(operations, OperationType.SESSION_END, startedSessions);
+ }
+ catch (InterruptedException e) {
+ throw new IgniteException("Failed to wait for the GridCacheWriteBehindStore due to interruption.", e);
+ }
+ }
+
+ /**
+ * @param operations List of {@link OperationType}.
+ * @param op Operation.
+ * @param expected Expected number of operations for the given {@code op}.
+ */
+ private void checkOpCount(List<OperationType> operations, OperationType op, int expected) {
+ int n = 0;
+
+ for (OperationType o : operations) {
+ if (op.equals(o))
+ ++n;
+ }
+
+ assertEquals("Operation=" + op.name(), expected, n);
+ }
+
+ /**
+ * Operation type.
+ */
+ public enum OperationType {
+ /**
+ * Cache store session started.
+ */
+ SESSION_START,
+
+ /**
+ * Cache store session ended.
+ */
+ SESSION_END,
+ }
+
+ /**
+ * Cache store session factory.
+ */
+ public static class CacheStoreSessionFactory implements Factory<TestCacheStoreSessionListener> {
+ /** {@inheritDoc} */
+ @Override public TestCacheStoreSessionListener create() {
+ TestCacheStoreSessionListener lsnr = new TestCacheStoreSessionListener();
+ lsnr.setDataSource(new DataSourceStub());
+ return lsnr;
+ }
+ }
+
+ /**
+ * Test cache store session listener.
+ */
+ public static class TestCacheStoreSessionListener extends CacheJdbcStoreSessionListener {
+ /** */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** {@inheritDoc} */
+ @Override public void onSessionStart(CacheStoreSession ses) {
+ operations.add(OperationType.SESSION_START);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) {
+ operations.add(OperationType.SESSION_END);
+ }
+ }
+
+ /**
+ * Test cache store.
+ *
+ * {@link EmptyCacheStore#writeAll(Collection)} and {@link EmptyCacheStore#deleteAll(Collection)} should be called
+ * by {@link GridCacheWriteBehindStore}.
+ */
+ public static class EmptyCacheStore extends CacheStoreAdapter<Object, Object> {
+ /** */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** {@inheritDoc} */
+ @Override public Object load(Object key) throws CacheLoaderException {
+ entryCnt.getAndIncrement();
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeAll(Collection<Cache.Entry<?, ?>> entries) {
+ entryCnt.addAndGet(entries.size());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(Cache.Entry entry) throws CacheWriterException {
+ }
+
+ /** {@inheritDoc} */
+ @Override public void deleteAll(Collection<?> keys) {
+ entryCnt.addAndGet(keys.size());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void delete(Object key) throws CacheWriterException {
+ }
+ }
+
+ /**
+ * Data source stub which should not be called.
+ */
+ public static class DataSourceStub implements DataSource, Serializable {
+ /** {@inheritDoc} */
+ @Override public Connection getConnection() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Connection getConnection(String username, String password) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> T unwrap(Class<T> iface) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isWrapperFor(Class<?> iface) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public PrintWriter getLogWriter() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setLogWriter(PrintWriter out) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setLoginTimeout(int seconds) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getLoginTimeout() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Logger getParentLogger() throws SQLFeatureNotSupportedException {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b8672d7d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index d931ea9..e4930e0 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -18,6 +18,9 @@
package org.apache.ignite.testsuites;
import junit.framework.TestSuite;
+import org.apache.ignite.cache.store.CacheStoreListenerRWThroughDisabledAtomicCache;
+import org.apache.ignite.cache.store.CacheStoreListenerRWThroughDisabledTransactionalCache;
+import org.apache.ignite.cache.store.CacheStoreSessionListenerWriteBehindEnabled;
import org.apache.ignite.cache.store.jdbc.CacheJdbcStoreSessionListenerSelfTest;
import org.apache.ignite.internal.processors.GridCacheTxLoadFromStoreOnLockSelfTest;
import org.apache.ignite.internal.processors.cache.CacheClientStoreSelfTest;
@@ -276,6 +279,9 @@ public class IgniteCacheTestSuite4 extends TestSuite {
suite.addTestSuite(CacheOffheapMapEntrySelfTest.class);
suite.addTestSuite(CacheJdbcStoreSessionListenerSelfTest.class);
+ suite.addTestSuite(CacheStoreListenerRWThroughDisabledAtomicCache.class);
+ suite.addTestSuite(CacheStoreListenerRWThroughDisabledTransactionalCache.class);
+ suite.addTestSuite(CacheStoreSessionListenerWriteBehindEnabled.class);
suite.addTestSuite(CacheClientStoreSelfTest.class);
suite.addTestSuite(CacheStoreUsageMultinodeStaticStartAtomicTest.class);
@@ -341,4 +347,4 @@ public class IgniteCacheTestSuite4 extends TestSuite {
return suite;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b8672d7d/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs
index 818948c..6c9def3 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs
@@ -106,17 +106,8 @@ namespace Apache.Ignite.Core.Tests.Cache.Store
tx.Rollback();
}
- // SessionEnd is called once per store instance.
- Assert.AreEqual(StoreCount, _dumps.Count);
-
- foreach (var ops in _dumps)
- {
- var op = ops.Single();
- Assert.AreEqual(OperationType.SesEnd, op.Type);
- Assert.IsFalse(op.Commit);
- }
-
- _dumps = new ConcurrentBag<ICollection<Operation>>();
+ // SessionEnd should not be called.
+ Assert.AreEqual(0, _dumps.Count);
// 2. Test puts.
using (var tx = ignite.GetTransactions().TxStart())