You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/11/17 12:05:38 UTC

ignite git commit: ignite-6924: Fixed missed CacheStoreSessionListener#onSessionStart() call

Repository: ignite
Updated Branches:
  refs/heads/master 1ebeee00e -> e3099cc47


ignite-6924: Fixed missed CacheStoreSessionListener#onSessionStart() call


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e3099cc4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e3099cc4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e3099cc4

Branch: refs/heads/master
Commit: e3099cc47e4086605312d88aeda3ca85e1e6aeff
Parents: 1ebeee0
Author: Slava Koptilin <sl...@gmail.com>
Authored: Fri Nov 17 15:05:31 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Nov 17 15:05:31 2017 +0300

----------------------------------------------------------------------
 .../cache/store/CacheStoreManager.java          |  12 +-
 .../store/GridCacheStoreManagerAdapter.java     |   7 ++
 .../cache/store/GridCacheWriteBehindStore.java  |  10 +-
 ...reSessionListenerWriteBehindEnabledTest.java | 117 +++++++++++++++++--
 4 files changed, 135 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e3099cc4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java
index 83428b3..e22cb05 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java
@@ -175,13 +175,23 @@ public interface CacheStoreManager extends GridCacheManager {
     public void sessionEnd(IgniteInternalTx tx, boolean commit, boolean last, boolean storeSessionEnded) throws IgniteCheckedException;
 
     /**
-     * End session initiated by write-behind store.
+     * Start session initiated by write-behind store.
      *
      * @throws IgniteCheckedException If failed.
      */
     public void writeBehindSessionInit() throws IgniteCheckedException;
 
     /**
+     * Notifies cache store session listeners.
+     *
+     * This method is called by write-behind store in case of back-pressure mechanism is initiated.
+     * It is assumed that cache store session was started by CacheStoreManager before.
+     *
+     * @throws IgniteCheckedException If failed.
+     */
+    public void writeBehindCacheStoreSessionListenerStart()  throws IgniteCheckedException;
+
+    /**
      * End session initiated by write-behind store.
      *
      * @param threwEx If exception was thrown.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e3099cc4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
index 22c2381..e862c0a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
@@ -829,6 +829,13 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
     }
 
     /** {@inheritDoc} */
+    @Override public void writeBehindCacheStoreSessionListenerStart()  throws IgniteCheckedException {
+        assert sesHolder.get() != null;
+
+        notifyCacheStoreSessionListeners(sesHolder.get(), null, true);
+    }
+
+    /** {@inheritDoc} */
     @Override public void writeBehindSessionEnd(boolean threwEx) throws IgniteCheckedException {
         sessionEnd0(null, threwEx);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e3099cc4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
index 831b1b0..d7a13e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java
@@ -798,8 +798,14 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy
         Flusher flusher
     ) {
         try {
-            if (initSes && storeMgr != null)
-                storeMgr.writeBehindSessionInit();
+            if (storeMgr != null) {
+                if (initSes)
+                    storeMgr.writeBehindSessionInit();
+                else
+                    // Back-pressure mechanism is running.
+                    // Cache store session must be initialized by storeMgr.
+                    storeMgr.writeBehindCacheStoreSessionListenerStart();
+            }
 
             boolean threwEx = true;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e3099cc4/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerWriteBehindEnabledTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerWriteBehindEnabledTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerWriteBehindEnabledTest.java
index b9095d0..c9a912a 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerWriteBehindEnabledTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerWriteBehindEnabledTest.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.logging.Logger;
 import javax.cache.Cache;
@@ -34,14 +35,15 @@ import javax.cache.configuration.FactoryBuilder;
 import javax.cache.integration.CacheLoaderException;
 import javax.cache.integration.CacheWriterException;
 import javax.sql.DataSource;
-import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.store.jdbc.CacheJdbcStoreSessionListener;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheAbstractSelfTest;
 import org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore;
-import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.CacheStoreSessionResource;
+import org.apache.ignite.testframework.GridTestUtils;
 
 /**
  * This class tests that calls of {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)}
@@ -61,6 +63,9 @@ public class CacheStoreSessionListenerWriteBehindEnabledTest extends GridCacheAb
     /** */
     private static final AtomicInteger entryCnt = new AtomicInteger();
 
+    /** */
+    private static final AtomicInteger uninitializedListenerCnt = new AtomicInteger();
+
     /** {@inheritDoc} */
     @Override protected int gridCount() {
         return 1;
@@ -93,6 +98,8 @@ public class CacheStoreSessionListenerWriteBehindEnabledTest extends GridCacheAb
         operations.clear();
 
         entryCnt.set(0);
+
+        uninitializedListenerCnt.set(0);
     }
 
     /**
@@ -136,6 +143,83 @@ public class CacheStoreSessionListenerWriteBehindEnabledTest extends GridCacheAb
     }
 
     /**
+     * Tests that cache store session listeners are notified by write-behind store.
+     */
+    public void testFlushSingleValue() throws Exception {
+        CacheConfiguration cfg = cacheConfiguration(getTestIgniteInstanceName());
+
+        cfg.setName("back-pressure-control");
+        cfg.setWriteBehindBatchSize(2);
+        cfg.setWriteBehindFlushSize(2);
+        cfg.setWriteBehindFlushFrequency(1_000);
+        cfg.setWriteBehindCoalescing(true);
+
+        IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(cfg);
+
+        try {
+            int nUploaders = 5;
+
+            final CyclicBarrier barrier = new CyclicBarrier(nUploaders);
+
+            IgniteInternalFuture[] uploaders = new IgniteInternalFuture[nUploaders];
+
+            for (int i = 0; i < nUploaders; ++i) {
+                uploaders[i] = GridTestUtils.runAsync(
+                    new Uploader(cache, barrier, i * CNT),
+                    "uploader-" + i);
+            }
+
+            for (int i = 0; i < nUploaders; ++i)
+                uploaders[i].get();
+
+            assertEquals("Uninitialized cache store session listener.", 0, uninitializedListenerCnt.get());
+        }
+        finally {
+            cache.destroy();
+        }
+    }
+
+    /**
+     *
+     */
+    public static class Uploader implements Runnable {
+        /** */
+        private final int start;
+
+        /** */
+        private final CyclicBarrier barrier;
+
+        /** */
+        private final IgniteCache<Object, Object> cache;
+
+        /**
+         * @param cache Ignite cache.
+         * @param barrier Cyclic barrier.
+         * @param start Key index.
+         */
+        public Uploader(IgniteCache<Object, Object> cache, CyclicBarrier barrier, int start) {
+            this.cache = cache;
+
+            this.barrier = barrier;
+
+            this.start = start;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            try {
+                barrier.await();
+
+                for (int i = start; i < start + CNT; ++i)
+                    cache.put(i, i);
+            }
+            catch (Exception e) {
+                fail("Unexpected exception [" + e + "]");
+            }
+        }
+    }
+
+    /**
      * @param startedSessions Number of expected sessions.
      */
     private void checkSessionCounters(int startedSessions) {
@@ -145,6 +229,8 @@ public class CacheStoreSessionListenerWriteBehindEnabledTest extends GridCacheAb
 
             assertEquals(CNT, entryCnt.get());
 
+            assertEquals("Uninitialized cache store session listener.", 0, uninitializedListenerCnt.get());
+
             checkOpCount(operations, OperationType.SESSION_START, startedSessions);
 
             checkOpCount(operations, OperationType.SESSION_END, startedSessions);
@@ -201,18 +287,19 @@ public class CacheStoreSessionListenerWriteBehindEnabledTest extends GridCacheAb
      * Test cache store session listener.
      */
     public static class TestCacheStoreSessionListener extends CacheJdbcStoreSessionListener {
-        /** */
-        @IgniteInstanceResource
-        private Ignite ignite;
-
         /** {@inheritDoc} */
         @Override public void onSessionStart(CacheStoreSession ses) {
             operations.add(OperationType.SESSION_START);
+
+            if (ses.attachment() == null)
+                ses.attach(new Object());
         }
 
         /** {@inheritDoc} */
         @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) {
             operations.add(OperationType.SESSION_END);
+
+            ses.attach(null);
         }
     }
 
@@ -224,31 +311,45 @@ public class CacheStoreSessionListenerWriteBehindEnabledTest extends GridCacheAb
      */
     public static class EmptyCacheStore extends CacheStoreAdapter<Object, Object> {
         /** */
-        @IgniteInstanceResource
-        private Ignite ignite;
+        @CacheStoreSessionResource
+        private CacheStoreSession ses;
 
         /** {@inheritDoc} */
         @Override public Object load(Object key) throws CacheLoaderException {
             entryCnt.getAndIncrement();
+
+            if (ses.attachment() == null)
+                uninitializedListenerCnt.incrementAndGet();
+
             return null;
         }
 
         /** {@inheritDoc} */
         @Override public void writeAll(Collection<Cache.Entry<?, ?>> entries) {
             entryCnt.addAndGet(entries.size());
+
+            if (ses.attachment() == null)
+                uninitializedListenerCnt.incrementAndGet();
         }
 
         /** {@inheritDoc} */
         @Override public void write(Cache.Entry entry) throws CacheWriterException {
+            if (ses.attachment() == null)
+                uninitializedListenerCnt.incrementAndGet();
         }
 
         /** {@inheritDoc} */
         @Override public void deleteAll(Collection<?> keys) {
             entryCnt.addAndGet(keys.size());
+
+            if (ses.attachment() == null)
+                uninitializedListenerCnt.incrementAndGet();
         }
 
         /** {@inheritDoc} */
         @Override public void delete(Object key) throws CacheWriterException {
+            if (ses.attachment() == null)
+                uninitializedListenerCnt.incrementAndGet();
         }
     }