You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/11/17 12:05:38 UTC
ignite git commit: ignite-6924: Fixed missed
CacheStoreSessionListener#onSessionStart() call
Repository: ignite
Updated Branches:
refs/heads/master 1ebeee00e -> e3099cc47
ignite-6924: Fixed missed CacheStoreSessionListener#onSessionStart() call
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e3099cc4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e3099cc4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e3099cc4
Branch: refs/heads/master
Commit: e3099cc47e4086605312d88aeda3ca85e1e6aeff
Parents: 1ebeee0
Author: Slava Koptilin <sl...@gmail.com>
Authored: Fri Nov 17 15:05:31 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Nov 17 15:05:31 2017 +0300
----------------------------------------------------------------------
.../cache/store/CacheStoreManager.java | 12 +-
.../store/GridCacheStoreManagerAdapter.java | 7 ++
.../cache/store/GridCacheWriteBehindStore.java | 10 +-
...reSessionListenerWriteBehindEnabledTest.java | 117 +++++++++++++++++--
4 files changed, 135 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e3099cc4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java
index 83428b3..e22cb05 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java
@@ -175,13 +175,23 @@ public interface CacheStoreManager extends GridCacheManager {
public void sessionEnd(IgniteInternalTx tx, boolean commit, boolean last, boolean storeSessionEnded) throws IgniteCheckedException;
/**
- * End session initiated by write-behind store.
+ * Start session initiated by write-behind store.
*
* @throws IgniteCheckedException If failed.
*/
public void writeBehindSessionInit() throws IgniteCheckedException;
/**
+ * Notifies cache store session listeners.
+ *
+ * This method is called by write-behind store in case of back-pressure mechanism is initiated.
+ * It is assumed that cache store session was started by CacheStoreManager before.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+ public void writeBehindCacheStoreSessionListenerStart() throws IgniteCheckedException;
+
+ /**
* End session initiated by write-behind store.
*
* @param threwEx If exception was thrown.
http://git-wip-us.apache.org/repos/asf/ignite/blob/e3099cc4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
index 22c2381..e862c0a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
@@ -829,6 +829,13 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
}
/** {@inheritDoc} */
+ @Override public void writeBehindCacheStoreSessionListenerStart() throws IgniteCheckedException {
+ assert sesHolder.get() != null;
+
+ notifyCacheStoreSessionListeners(sesHolder.get(), null, true);
+ }
+
+ /** {@inheritDoc} */
@Override public void writeBehindSessionEnd(boolean threwEx) throws IgniteCheckedException {
sessionEnd0(null, threwEx);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e3099cc4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
index 831b1b0..d7a13e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
@@ -798,8 +798,14 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
Flusher flusher
) {
try {
- if (initSes && storeMgr != null)
- storeMgr.writeBehindSessionInit();
+ if (storeMgr != null) {
+ if (initSes)
+ storeMgr.writeBehindSessionInit();
+ else
+ // Back-pressure mechanism is running.
+ // Cache store session must be initialized by storeMgr.
+ storeMgr.writeBehindCacheStoreSessionListenerStart();
+ }
boolean threwEx = true;
http://git-wip-us.apache.org/repos/asf/ignite/blob/e3099cc4/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerWriteBehindEnabledTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerWriteBehindEnabledTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerWriteBehindEnabledTest.java
index b9095d0..c9a912a 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerWriteBehindEnabledTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerWriteBehindEnabledTest.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
import javax.cache.Cache;
@@ -34,14 +35,15 @@ import javax.cache.configuration.FactoryBuilder;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
import javax.sql.DataSource;
-import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.store.jdbc.CacheJdbcStoreSessionListener;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.GridCacheAbstractSelfTest;
import org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore;
-import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.CacheStoreSessionResource;
+import org.apache.ignite.testframework.GridTestUtils;
/**
* This class tests that calls of {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)}
@@ -61,6 +63,9 @@ public class CacheStoreSessionListenerWriteBehindEnabledTest extends GridCacheAb
/** */
private static final AtomicInteger entryCnt = new AtomicInteger();
+ /** */
+ private static final AtomicInteger uninitializedListenerCnt = new AtomicInteger();
+
/** {@inheritDoc} */
@Override protected int gridCount() {
return 1;
@@ -93,6 +98,8 @@ public class CacheStoreSessionListenerWriteBehindEnabledTest extends GridCacheAb
operations.clear();
entryCnt.set(0);
+
+ uninitializedListenerCnt.set(0);
}
/**
@@ -136,6 +143,83 @@ public class CacheStoreSessionListenerWriteBehindEnabledTest extends GridCacheAb
}
/**
+ * Tests that cache store session listeners are notified by write-behind store.
+ */
+ public void testFlushSingleValue() throws Exception {
+ CacheConfiguration cfg = cacheConfiguration(getTestIgniteInstanceName());
+
+ cfg.setName("back-pressure-control");
+ cfg.setWriteBehindBatchSize(2);
+ cfg.setWriteBehindFlushSize(2);
+ cfg.setWriteBehindFlushFrequency(1_000);
+ cfg.setWriteBehindCoalescing(true);
+
+ IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(cfg);
+
+ try {
+ int nUploaders = 5;
+
+ final CyclicBarrier barrier = new CyclicBarrier(nUploaders);
+
+ IgniteInternalFuture[] uploaders = new IgniteInternalFuture[nUploaders];
+
+ for (int i = 0; i < nUploaders; ++i) {
+ uploaders[i] = GridTestUtils.runAsync(
+ new Uploader(cache, barrier, i * CNT),
+ "uploader-" + i);
+ }
+
+ for (int i = 0; i < nUploaders; ++i)
+ uploaders[i].get();
+
+ assertEquals("Uninitialized cache store session listener.", 0, uninitializedListenerCnt.get());
+ }
+ finally {
+ cache.destroy();
+ }
+ }
+
+ /**
+ *
+ */
+ public static class Uploader implements Runnable {
+ /** */
+ private final int start;
+
+ /** */
+ private final CyclicBarrier barrier;
+
+ /** */
+ private final IgniteCache<Object, Object> cache;
+
+ /**
+ * @param cache Ignite cache.
+ * @param barrier Cyclic barrier.
+ * @param start Key index.
+ */
+ public Uploader(IgniteCache<Object, Object> cache, CyclicBarrier barrier, int start) {
+ this.cache = cache;
+
+ this.barrier = barrier;
+
+ this.start = start;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ try {
+ barrier.await();
+
+ for (int i = start; i < start + CNT; ++i)
+ cache.put(i, i);
+ }
+ catch (Exception e) {
+ fail("Unexpected exception [" + e + "]");
+ }
+ }
+ }
+
+ /**
* @param startedSessions Number of expected sessions.
*/
private void checkSessionCounters(int startedSessions) {
@@ -145,6 +229,8 @@ public class CacheStoreSessionListenerWriteBehindEnabledTest extends GridCacheAb
assertEquals(CNT, entryCnt.get());
+ assertEquals("Uninitialized cache store session listener.", 0, uninitializedListenerCnt.get());
+
checkOpCount(operations, OperationType.SESSION_START, startedSessions);
checkOpCount(operations, OperationType.SESSION_END, startedSessions);
@@ -201,18 +287,19 @@ public class CacheStoreSessionListenerWriteBehindEnabledTest extends GridCacheAb
* Test cache store session listener.
*/
public static class TestCacheStoreSessionListener extends CacheJdbcStoreSessionListener {
- /** */
- @IgniteInstanceResource
- private Ignite ignite;
-
/** {@inheritDoc} */
@Override public void onSessionStart(CacheStoreSession ses) {
operations.add(OperationType.SESSION_START);
+
+ if (ses.attachment() == null)
+ ses.attach(new Object());
}
/** {@inheritDoc} */
@Override public void onSessionEnd(CacheStoreSession ses, boolean commit) {
operations.add(OperationType.SESSION_END);
+
+ ses.attach(null);
}
}
@@ -224,31 +311,45 @@ public class CacheStoreSessionListenerWriteBehindEnabledTest extends GridCacheAb
*/
public static class EmptyCacheStore extends CacheStoreAdapter<Object, Object> {
/** */
- @IgniteInstanceResource
- private Ignite ignite;
+ @CacheStoreSessionResource
+ private CacheStoreSession ses;
/** {@inheritDoc} */
@Override public Object load(Object key) throws CacheLoaderException {
entryCnt.getAndIncrement();
+
+ if (ses.attachment() == null)
+ uninitializedListenerCnt.incrementAndGet();
+
return null;
}
/** {@inheritDoc} */
@Override public void writeAll(Collection<Cache.Entry<?, ?>> entries) {
entryCnt.addAndGet(entries.size());
+
+ if (ses.attachment() == null)
+ uninitializedListenerCnt.incrementAndGet();
}
/** {@inheritDoc} */
@Override public void write(Cache.Entry entry) throws CacheWriterException {
+ if (ses.attachment() == null)
+ uninitializedListenerCnt.incrementAndGet();
}
/** {@inheritDoc} */
@Override public void deleteAll(Collection<?> keys) {
entryCnt.addAndGet(keys.size());
+
+ if (ses.attachment() == null)
+ uninitializedListenerCnt.incrementAndGet();
}
/** {@inheritDoc} */
@Override public void delete(Object key) throws CacheWriterException {
+ if (ses.attachment() == null)
+ uninitializedListenerCnt.incrementAndGet();
}
}