You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2019/03/20 11:59:34 UTC

[ignite] branch master updated: IGNITE-11371 Return non-null value if read-through value store failed due to version mismatch - Fixes #6290.

This is an automated email from the ASF dual-hosted git repository.

agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new d9693a7  IGNITE-11371 Return non-null value if read-through value store failed due to version mismatch - Fixes #6290.
d9693a7 is described below

commit d9693a7136f935dbc937edc264a273d776a9b5b6
Author: Alexey Goncharuk <al...@gmail.com>
AuthorDate: Wed Mar 20 14:58:10 2019 +0300

    IGNITE-11371 Return non-null value if read-through value store failed due to version mismatch - Fixes #6290.
    
    Signed-off-by: Alexey Goncharuk <al...@gmail.com>
---
 .../processors/cache/GridCacheAdapter.java         |  16 +-
 .../cache/CacheGetRemoveSkipStoreTest.java         | 293 +++++++++++++++++++++
 .../testsuites/IgniteCacheMvccTestSuite4.java      |   2 +
 .../ignite/testsuites/IgniteCacheTestSuite4.java   |   2 +
 4 files changed, 312 insertions(+), 1 deletion(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 60be9d5..fca8abe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -2181,10 +2181,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
                                                 entry.unswap();
 
+                                                GridCacheVersion newVer = ctx.versions().next();
+
                                                 EntryGetResult verVal = entry.versionedValue(
                                                     cacheVal,
                                                     res.version(),
-                                                    null,
+                                                    newVer,
                                                     expiry,
                                                     readerArgs);
 
@@ -2205,6 +2207,18 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                                                         true,
                                                         needVer);
                                                 }
+                                                else {
+                                                    ctx.addResult(
+                                                        map,
+                                                        key,
+                                                        new EntryGetResult(cacheVal, res.version()),
+                                                        skipVals,
+                                                        keepCacheObjects,
+                                                        deserializeBinary,
+                                                        false,
+                                                        needVer
+                                                    );
+                                                }
 
                                                 if (tx0 == null || (!tx0.implicit() &&
                                                     tx0.isolation() == READ_COMMITTED))
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetRemoveSkipStoreTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetRemoveSkipStoreTest.java
new file mode 100644
index 0000000..a4e8969
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetRemoveSkipStoreTest.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.cache.Cache;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.MvccFeatureChecker;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class CacheGetRemoveSkipStoreTest extends GridCommonAbstractTest {
+    /** */
+    public static final String TEST_CACHE = "testCache";
+
+    /** Number of nodes for the test. */
+    public static final int GRID_CNT = 3;
+
+    /** Read semaphore to delay read-through. */
+    private static volatile Semaphore readSemaphore;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        if (igniteInstanceName.contains("client"))
+            cfg.setClientMode(true);
+
+        return cfg;
+    }
+
+    /**
+     * Creates cache configuration with the given atomicity mode and number of backups.
+     *
+     * @param atomicity Atomicity mode.
+     * @param backups Number of backups.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<Integer, Integer> configuration(
+        CacheAtomicityMode atomicity,
+        int backups
+    ) {
+        return new CacheConfiguration<Integer, Integer>()
+            .setCacheMode(CacheMode.PARTITIONED)
+            .setName(TEST_CACHE)
+            .setAtomicityMode(atomicity)
+            .setBackups(backups)
+            .setCacheStoreFactory(TestCacheStore::new)
+            .setReadThrough(true)
+            .setWriteThrough(false);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setUp() throws Exception {
+        MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.CACHE_STORE);
+
+        super.setUp();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        readSemaphore = null;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGrids(GRID_CNT);
+
+        startGrid("client");
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testTransactionalNoBackups() throws Exception {
+        checkNoNullReads(grid("client"), configuration(CacheAtomicityMode.TRANSACTIONAL, 0));
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testTransactionalOneBackup() throws Exception {
+        checkNoNullReads(grid("client"), configuration(CacheAtomicityMode.TRANSACTIONAL, 1));
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testAtomicNoBackups() throws Exception {
+        checkNoNullReads(grid("client"), configuration(CacheAtomicityMode.ATOMIC, 0));
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testAtomicOneBackup() throws Exception {
+        checkNoNullReads(grid("client"), configuration(CacheAtomicityMode.ATOMIC, 1));
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    private void checkNoNullReads(Ignite client, CacheConfiguration<Integer, Integer> ccfg) throws Exception {
+        client.getOrCreateCache(ccfg);
+
+        try {
+            checkNoNullReads(client.cache(ccfg.getName()), 0);
+            checkNoNullReads(grid(0).cache(ccfg.getName()), primaryKey(grid(0).cache(ccfg.getName())));
+
+            if (ccfg.getBackups() > 0)
+                checkNoNullReads(grid(0).cache(ccfg.getName()), backupKey(grid(0).cache(ccfg.getName())));
+        }
+        finally {
+            client.destroyCache(ccfg.getName());
+        }
+    }
+
+    /**
+     * @param cache Cache to check.
+     * @param key Key to check.
+     * @throws Exception If failed.
+     */
+    private void checkNoNullReads(IgniteCache<Integer, Integer> cache, Integer key) throws Exception {
+        assertNotNull(cache.get(key));
+
+        AtomicReference<String> failure = new AtomicReference<>();
+        AtomicBoolean stop = new AtomicBoolean(false);
+
+        IgniteInternalFuture fut = GridTestUtils.runAsync(() -> {
+            while (!stop.get()) {
+                Object res = cache.get(key);
+
+                if (res == null)
+                    failure.compareAndSet(null, "Failed in thread: " + Thread.currentThread().getName());
+            }
+        });
+
+        for (int i = 0; i < 100; i++)
+            cache.remove(key);
+
+        U.sleep(100);
+
+        stop.set(true);
+        fut.get();
+
+        assertNotNull(cache.get(key));
+
+        assertNull(failure.get());
+    }
+
+    /**
+     */
+    @Test
+    public void testRemoveIsAppliedTransactionalNoBackups() {
+        checkRemoveIsApplied(grid("client"), configuration(CacheAtomicityMode.TRANSACTIONAL, 0));
+    }
+
+    /**
+     */
+    @Test
+    public void testRemoveIsAppliedTransactionalOneBackups() {
+        checkRemoveIsApplied(grid("client"), configuration(CacheAtomicityMode.TRANSACTIONAL, 1));
+    }
+
+    /**
+     */
+    @Test
+    public void testRemoveIsAppliedAtomicNoBackups() {
+        checkRemoveIsApplied(grid("client"), configuration(CacheAtomicityMode.ATOMIC, 0));
+    }
+
+    /**
+     */
+    @Test
+    public void testRemoveIsAppliedAtomicOneBackups() {
+        checkRemoveIsApplied(grid("client"), configuration(CacheAtomicityMode.ATOMIC, 1));
+    }
+
+    /**
+     * @param client Client to test.
+     * @param ccfg Cache configuration
+     */
+    public void checkRemoveIsApplied(Ignite client, CacheConfiguration<Integer, Integer> ccfg) {
+        // Allow first read.
+        readSemaphore = new Semaphore(1);
+
+        IgniteCache<Integer, Integer> cache = client.getOrCreateCache(ccfg);
+
+        try {
+            Integer key = 1;
+
+            assertNotNull(cache.get(key));
+
+            Ignite primary = grid(client.affinity(ccfg.getName()).mapKeyToNode(key));
+
+            assertNotNull(primary.cache(ccfg.getName()).localPeek(key));
+
+            // Read-through will be blocked on semaphore.
+            IgniteFuture<Integer> getFut = cache.getAsync(key);
+
+            cache.remove(key);
+
+            assertNull(primary.cache(ccfg.getName()).localPeek(key)); // Check that remove actually takes place.
+
+            readSemaphore.release(2);
+
+            assertNotNull(getFut.get());
+            assertNotNull(cache.get(key));
+            assertNotNull(primary.cache(ccfg.getName()).localPeek(key));
+        }
+        finally {
+            client.destroyCache(ccfg.getName());
+        }
+    }
+
+    /**
+     * Dummy cache store which delays key load and loads a predefined value.
+     */
+    public static class TestCacheStore extends CacheStoreAdapter<Integer, Integer> {
+        /** */
+        static final Integer CONSTANT_VALUE = -1;
+
+        /** {@inheritDoc} */
+        @Override public Integer load(Integer s) throws CacheLoaderException {
+            try {
+                if (readSemaphore != null)
+                    readSemaphore.acquire();
+                else
+                    U.sleep(1000);
+            }
+            catch (Exception e) {
+                throw new CacheLoaderException(e);
+            }
+
+            return CONSTANT_VALUE;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) throws CacheWriterException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void delete(Object o) throws CacheWriterException {
+            // No-op.
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite4.java
index c2e74dd..005677f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite4.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.cache.CacheGetEntryOptimisticSerial
 import org.apache.ignite.internal.processors.cache.CacheGetEntryPessimisticReadCommittedSelfTest;
 import org.apache.ignite.internal.processors.cache.CacheGetEntryPessimisticRepeatableReadSelfTest;
 import org.apache.ignite.internal.processors.cache.CacheGetEntryPessimisticSerializableSelfTest;
+import org.apache.ignite.internal.processors.cache.CacheGetRemoveSkipStoreTest;
 import org.apache.ignite.internal.processors.cache.CacheOffheapMapEntrySelfTest;
 import org.apache.ignite.internal.processors.cache.CacheReadThroughAtomicRestartSelfTest;
 import org.apache.ignite.internal.processors.cache.CacheReadThroughLocalAtomicRestartSelfTest;
@@ -143,6 +144,7 @@ public class IgniteCacheMvccTestSuite4 {
         ignoredTests.add(IgniteCacheAtomicNoReadThroughTest.class);
         ignoredTests.add(IgniteCacheAtomicNearEnabledNoReadThroughTest.class);
         ignoredTests.add(IgniteCacheAtomicLocalNoReadThroughTest.class);
+        ignoredTests.add(CacheGetRemoveSkipStoreTest.class);
         ignoredTests.add(IgniteCacheAtomicNoLoadPreviousValueTest.class);
         ignoredTests.add(IgniteCacheAtomicNearEnabledNoLoadPreviousValueTest.class);
         ignoredTests.add(IgniteCacheAtomicLocalNoLoadPreviousValueTest.class);
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index 5a23158..7c19909 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.processors.cache.CacheGetEntryOptimisticSerial
 import org.apache.ignite.internal.processors.cache.CacheGetEntryPessimisticReadCommittedSelfTest;
 import org.apache.ignite.internal.processors.cache.CacheGetEntryPessimisticRepeatableReadSelfTest;
 import org.apache.ignite.internal.processors.cache.CacheGetEntryPessimisticSerializableSelfTest;
+import org.apache.ignite.internal.processors.cache.CacheGetRemoveSkipStoreTest;
 import org.apache.ignite.internal.processors.cache.CacheOffheapMapEntrySelfTest;
 import org.apache.ignite.internal.processors.cache.CacheReadThroughAtomicRestartSelfTest;
 import org.apache.ignite.internal.processors.cache.CacheReadThroughLocalAtomicRestartSelfTest;
@@ -207,6 +208,7 @@ public class IgniteCacheTestSuite4 {
         GridTestUtils.addTestIfNeeded(suite, IgniteCacheTxNoReadThroughTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, IgniteCacheTxNearEnabledNoReadThroughTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, IgniteCacheTxLocalNoReadThroughTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, CacheGetRemoveSkipStoreTest.class, ignoredTests);
 
         GridTestUtils.addTestIfNeeded(suite, IgniteCacheAtomicNoLoadPreviousValueTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, IgniteCacheAtomicNearEnabledNoLoadPreviousValueTest.class, ignoredTests);