You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2019/03/20 11:59:34 UTC
[ignite] branch master updated: IGNITE-11371 Return non-null value
if read-through value store failed due to version mismatch - Fixes #6290.
This is an automated email from the ASF dual-hosted git repository.
agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new d9693a7 IGNITE-11371 Return non-null value if read-through value store failed due to version mismatch - Fixes #6290.
d9693a7 is described below
commit d9693a7136f935dbc937edc264a273d776a9b5b6
Author: Alexey Goncharuk <al...@gmail.com>
AuthorDate: Wed Mar 20 14:58:10 2019 +0300
IGNITE-11371 Return non-null value if read-through value store failed due to version mismatch - Fixes #6290.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
---
.../processors/cache/GridCacheAdapter.java | 16 +-
.../cache/CacheGetRemoveSkipStoreTest.java | 293 +++++++++++++++++++++
.../testsuites/IgniteCacheMvccTestSuite4.java | 2 +
.../ignite/testsuites/IgniteCacheTestSuite4.java | 2 +
4 files changed, 312 insertions(+), 1 deletion(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 60be9d5..fca8abe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -2181,10 +2181,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
entry.unswap();
+ GridCacheVersion newVer = ctx.versions().next();
+
EntryGetResult verVal = entry.versionedValue(
cacheVal,
res.version(),
- null,
+ newVer,
expiry,
readerArgs);
@@ -2205,6 +2207,18 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
true,
needVer);
}
+ else {
+ ctx.addResult(
+ map,
+ key,
+ new EntryGetResult(cacheVal, res.version()),
+ skipVals,
+ keepCacheObjects,
+ deserializeBinary,
+ false,
+ needVer
+ );
+ }
if (tx0 == null || (!tx0.implicit() &&
tx0.isolation() == READ_COMMITTED))
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetRemoveSkipStoreTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetRemoveSkipStoreTest.java
new file mode 100644
index 0000000..a4e8969
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetRemoveSkipStoreTest.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.cache.Cache;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.MvccFeatureChecker;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class CacheGetRemoveSkipStoreTest extends GridCommonAbstractTest {
+ /** */
+ public static final String TEST_CACHE = "testCache";
+
+ /** Number of nodes for the test. */
+ public static final int GRID_CNT = 3;
+
+ /** Read semaphore to delay read-through. */
+ private static volatile Semaphore readSemaphore;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ if (igniteInstanceName.contains("client"))
+ cfg.setClientMode(true);
+
+ return cfg;
+ }
+
+ /**
+ * Creates cache configuration with the given atomicity mode and number of backups.
+ *
+ * @param atomicity Atomicity mode.
+ * @param backups Number of backups.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration<Integer, Integer> configuration(
+ CacheAtomicityMode atomicity,
+ int backups
+ ) {
+ return new CacheConfiguration<Integer, Integer>()
+ .setCacheMode(CacheMode.PARTITIONED)
+ .setName(TEST_CACHE)
+ .setAtomicityMode(atomicity)
+ .setBackups(backups)
+ .setCacheStoreFactory(TestCacheStore::new)
+ .setReadThrough(true)
+ .setWriteThrough(false);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setUp() throws Exception {
+ MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.CACHE_STORE);
+
+ super.setUp();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ readSemaphore = null;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ startGrids(GRID_CNT);
+
+ startGrid("client");
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testTransactionalNoBackups() throws Exception {
+ checkNoNullReads(grid("client"), configuration(CacheAtomicityMode.TRANSACTIONAL, 0));
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testTransactionalOneBackup() throws Exception {
+ checkNoNullReads(grid("client"), configuration(CacheAtomicityMode.TRANSACTIONAL, 1));
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testAtomicNoBackups() throws Exception {
+ checkNoNullReads(grid("client"), configuration(CacheAtomicityMode.ATOMIC, 0));
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testAtomicOneBackup() throws Exception {
+ checkNoNullReads(grid("client"), configuration(CacheAtomicityMode.ATOMIC, 1));
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ private void checkNoNullReads(Ignite client, CacheConfiguration<Integer, Integer> ccfg) throws Exception {
+ client.getOrCreateCache(ccfg);
+
+ try {
+ checkNoNullReads(client.cache(ccfg.getName()), 0);
+ checkNoNullReads(grid(0).cache(ccfg.getName()), primaryKey(grid(0).cache(ccfg.getName())));
+
+ if (ccfg.getBackups() > 0)
+ checkNoNullReads(grid(0).cache(ccfg.getName()), backupKey(grid(0).cache(ccfg.getName())));
+ }
+ finally {
+ client.destroyCache(ccfg.getName());
+ }
+ }
+
+ /**
+ * @param cache Cache to check.
+ * @param key Key to check.
+ * @throws Exception If failed.
+ */
+ private void checkNoNullReads(IgniteCache<Integer, Integer> cache, Integer key) throws Exception {
+ assertNotNull(cache.get(key));
+
+ AtomicReference<String> failure = new AtomicReference<>();
+ AtomicBoolean stop = new AtomicBoolean(false);
+
+ IgniteInternalFuture fut = GridTestUtils.runAsync(() -> {
+ while (!stop.get()) {
+ Object res = cache.get(key);
+
+ if (res == null)
+ failure.compareAndSet(null, "Failed in thread: " + Thread.currentThread().getName());
+ }
+ });
+
+ for (int i = 0; i < 100; i++)
+ cache.remove(key);
+
+ U.sleep(100);
+
+ stop.set(true);
+ fut.get();
+
+ assertNotNull(cache.get(key));
+
+ assertNull(failure.get());
+ }
+
+ /**
+ */
+ @Test
+ public void testRemoveIsAppliedTransactionalNoBackups() {
+ checkRemoveIsApplied(grid("client"), configuration(CacheAtomicityMode.TRANSACTIONAL, 0));
+ }
+
+ /**
+ */
+ @Test
+ public void testRemoveIsAppliedTransactionalOneBackups() {
+ checkRemoveIsApplied(grid("client"), configuration(CacheAtomicityMode.TRANSACTIONAL, 1));
+ }
+
+ /**
+ */
+ @Test
+ public void testRemoveIsAppliedAtomicNoBackups() {
+ checkRemoveIsApplied(grid("client"), configuration(CacheAtomicityMode.ATOMIC, 0));
+ }
+
+ /**
+ */
+ @Test
+ public void testRemoveIsAppliedAtomicOneBackups() {
+ checkRemoveIsApplied(grid("client"), configuration(CacheAtomicityMode.ATOMIC, 1));
+ }
+
+ /**
+ * @param client Client to test.
+ * @param ccfg Cache configuration
+ */
+ public void checkRemoveIsApplied(Ignite client, CacheConfiguration<Integer, Integer> ccfg) {
+ // Allow first read.
+ readSemaphore = new Semaphore(1);
+
+ IgniteCache<Integer, Integer> cache = client.getOrCreateCache(ccfg);
+
+ try {
+ Integer key = 1;
+
+ assertNotNull(cache.get(key));
+
+ Ignite primary = grid(client.affinity(ccfg.getName()).mapKeyToNode(key));
+
+ assertNotNull(primary.cache(ccfg.getName()).localPeek(key));
+
+ // Read-through will be blocked on semaphore.
+ IgniteFuture<Integer> getFut = cache.getAsync(key);
+
+ cache.remove(key);
+
+ assertNull(primary.cache(ccfg.getName()).localPeek(key)); // Check that remove actually takes place.
+
+ readSemaphore.release(2);
+
+ assertNotNull(getFut.get());
+ assertNotNull(cache.get(key));
+ assertNotNull(primary.cache(ccfg.getName()).localPeek(key));
+ }
+ finally {
+ client.destroyCache(ccfg.getName());
+ }
+ }
+
+ /**
+ * Dummy cache store which delays key load and loads a predefined value.
+ */
+ public static class TestCacheStore extends CacheStoreAdapter<Integer, Integer> {
+ /** */
+ static final Integer CONSTANT_VALUE = -1;
+
+ /** {@inheritDoc} */
+ @Override public Integer load(Integer s) throws CacheLoaderException {
+ try {
+ if (readSemaphore != null)
+ readSemaphore.acquire();
+ else
+ U.sleep(1000);
+ }
+ catch (Exception e) {
+ throw new CacheLoaderException(e);
+ }
+
+ return CONSTANT_VALUE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) throws CacheWriterException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void delete(Object o) throws CacheWriterException {
+ // No-op.
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite4.java
index c2e74dd..005677f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite4.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.cache.CacheGetEntryOptimisticSerial
import org.apache.ignite.internal.processors.cache.CacheGetEntryPessimisticReadCommittedSelfTest;
import org.apache.ignite.internal.processors.cache.CacheGetEntryPessimisticRepeatableReadSelfTest;
import org.apache.ignite.internal.processors.cache.CacheGetEntryPessimisticSerializableSelfTest;
+import org.apache.ignite.internal.processors.cache.CacheGetRemoveSkipStoreTest;
import org.apache.ignite.internal.processors.cache.CacheOffheapMapEntrySelfTest;
import org.apache.ignite.internal.processors.cache.CacheReadThroughAtomicRestartSelfTest;
import org.apache.ignite.internal.processors.cache.CacheReadThroughLocalAtomicRestartSelfTest;
@@ -143,6 +144,7 @@ public class IgniteCacheMvccTestSuite4 {
ignoredTests.add(IgniteCacheAtomicNoReadThroughTest.class);
ignoredTests.add(IgniteCacheAtomicNearEnabledNoReadThroughTest.class);
ignoredTests.add(IgniteCacheAtomicLocalNoReadThroughTest.class);
+ ignoredTests.add(CacheGetRemoveSkipStoreTest.class);
ignoredTests.add(IgniteCacheAtomicNoLoadPreviousValueTest.class);
ignoredTests.add(IgniteCacheAtomicNearEnabledNoLoadPreviousValueTest.class);
ignoredTests.add(IgniteCacheAtomicLocalNoLoadPreviousValueTest.class);
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index 5a23158..7c19909 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.processors.cache.CacheGetEntryOptimisticSerial
import org.apache.ignite.internal.processors.cache.CacheGetEntryPessimisticReadCommittedSelfTest;
import org.apache.ignite.internal.processors.cache.CacheGetEntryPessimisticRepeatableReadSelfTest;
import org.apache.ignite.internal.processors.cache.CacheGetEntryPessimisticSerializableSelfTest;
+import org.apache.ignite.internal.processors.cache.CacheGetRemoveSkipStoreTest;
import org.apache.ignite.internal.processors.cache.CacheOffheapMapEntrySelfTest;
import org.apache.ignite.internal.processors.cache.CacheReadThroughAtomicRestartSelfTest;
import org.apache.ignite.internal.processors.cache.CacheReadThroughLocalAtomicRestartSelfTest;
@@ -207,6 +208,7 @@ public class IgniteCacheTestSuite4 {
GridTestUtils.addTestIfNeeded(suite, IgniteCacheTxNoReadThroughTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, IgniteCacheTxNearEnabledNoReadThroughTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, IgniteCacheTxLocalNoReadThroughTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, CacheGetRemoveSkipStoreTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, IgniteCacheAtomicNoLoadPreviousValueTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, IgniteCacheAtomicNearEnabledNoLoadPreviousValueTest.class, ignoredTests);