You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2018/05/11 23:02:35 UTC
[geode] branch develop updated: Feature/geode 5173 1 (#1948)
This is an automated email from the ASF dual-hosted git repository.
eshu11 pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 52bdb96 Feature/geode 5173 1 (#1948)
52bdb96 is described below
commit 52bdb96e7c073ed623f4f53e1e6083f849bf2863
Author: pivotal-eshu <es...@pivotal.io>
AuthorDate: Fri May 11 16:02:30 2018 -0700
Feature/geode 5173 1 (#1948)
* GEODE-5173: Transaction will fault in value if value is Token.NOT_AVAILABLE or isEvicted.
---
.../geode/internal/cache/InternalRegion.java | 2 +-
.../apache/geode/internal/cache/LocalRegion.java | 22 +-
.../geode/internal/cache/NonLocalRegionEntry.java | 5 +
.../org/apache/geode/internal/cache/Oplog.java | 4 +
.../geode/internal/cache/ProxyRegionMap.java | 5 +
.../apache/geode/internal/cache/RegionEntry.java | 2 +
.../geode/internal/cache/ValidatingDiskRegion.java | 5 +
.../cache/entries/AbstractRegionEntry.java | 5 +
.../PersistentRegionTransactionDUnitTest.java | 242 +++++++++++++++++++++
9 files changed, 278 insertions(+), 14 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
index 1a311a4..91a4a52 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
@@ -314,7 +314,7 @@ public interface InternalRegion extends Region, HasCachePerfStats, RegionEntryCo
void setInUseByTransaction(boolean b);
- void txLRUStart();
+ boolean txLRUStart();
void txLRUEnd();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 6f1c6b4..ec65991 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -8336,19 +8336,15 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
try {
synchronized (regionEntry) {
if (!regionEntry.isRemoved()) {
- if (regionEntry instanceof DiskEntry && regionEntry instanceof EvictableEntry) {
- EvictableEntry le = (EvictableEntry) regionEntry;
- if (le.isEvicted()) {
- // Handle the case where we fault in a disk entry
- txLRUStart();
- needsLRUCleanup = true;
-
- // Fault in the value from disk
- regionEntry.getValue(this);
- }
+ Object value = regionEntry.getValueInVM(this);
+ if (value == Token.NOT_AVAILABLE || regionEntry.isEvicted()) {
+ // Entry value is on disk
+ // Handle the case where we fault in a evicted disk entry
+ needsLRUCleanup = txLRUStart();
+ // Fault in the value from disk
+ value = regionEntry.getValue(this);
}
- Object value = regionEntry.getValueInVM(this);
/*
* The tx will need the raw value for identity comparison. Please see
* TXEntryState#checkForConflict(LocalRegion,Object)
@@ -8448,8 +8444,8 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
}
@Override
- public void txLRUStart() {
- this.entries.disableLruUpdateCallback();
+ public boolean txLRUStart() {
+ return this.entries.disableLruUpdateCallback();
}
@Override
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/NonLocalRegionEntry.java b/geode-core/src/main/java/org/apache/geode/internal/cache/NonLocalRegionEntry.java
index 1d96131..6fd558d 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/NonLocalRegionEntry.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/NonLocalRegionEntry.java
@@ -608,4 +608,9 @@ public class NonLocalRegionEntry implements RegionEntry, VersionStamp {
boolean isEntryUpdate) {
throw new IllegalStateException("Should never be called");
}
+
+ @Override
+ public boolean isEvicted() {
+ return false;
+ }
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java b/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
index 93d856d..2ff97d5 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
@@ -7285,6 +7285,10 @@ public class Oplog implements CompactableOplog, Flushable {
throw new IllegalStateException("Should never be called");
}
+ @Override
+ public boolean isEvicted() {
+ return false;
+ }
}
/**
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyRegionMap.java
index bd3059d..26d95a6 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyRegionMap.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyRegionMap.java
@@ -785,6 +785,11 @@ class ProxyRegionMap implements RegionMap {
EntryEventImpl event, boolean isEntryUpdate) {
throw new IllegalStateException("Should never be called");
}
+
+ @Override
+ public boolean isEvicted() {
+ return false;
+ }
}
@Override
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEntry.java b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEntry.java
index c16ad23..24cb359 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEntry.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEntry.java
@@ -461,4 +461,6 @@ public interface RegionEntry {
@Retained(ABSTRACT_REGION_ENTRY_PREPARE_VALUE_FOR_CACHE)
Object prepareValueForCache(RegionEntryContext context, Object value, EntryEventImpl event,
boolean isEntryUpdate);
+
+ boolean isEvicted();
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ValidatingDiskRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ValidatingDiskRegion.java
index cc0908a..0823c7d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/ValidatingDiskRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ValidatingDiskRegion.java
@@ -532,6 +532,11 @@ public class ValidatingDiskRegion extends DiskRegion implements DiskRecoveryStor
EntryEventImpl event, boolean isEntryUpdate) {
throw new IllegalStateException("Should never be called");
}
+
+ @Override
+ public boolean isEvicted() {
+ return false;
+ }
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractRegionEntry.java b/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractRegionEntry.java
index 7f8b6d4..9c08bf0 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractRegionEntry.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractRegionEntry.java
@@ -2235,4 +2235,9 @@ public abstract class AbstractRegionEntry implements HashRegionEntry<Object, Obj
public void returnToPool() {
// noop by default
}
+
+ @Override
+ public boolean isEvicted() {
+ return false;
+ }
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PersistentRegionTransactionDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PersistentRegionTransactionDUnitTest.java
new file mode 100644
index 0000000..8e7785b
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PersistentRegionTransactionDUnitTest.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+import java.util.stream.IntStream;
+
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.EvictionAction;
+import org.apache.geode.cache.EvictionAttributes;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.TransactionId;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
+import org.apache.geode.test.dunit.rules.DistributedDiskDirRule;
+import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
+import org.apache.geode.test.junit.categories.DistributedTest;
+
+@Category(DistributedTest.class)
+public class PersistentRegionTransactionDUnitTest extends JUnit4CacheTestCase {
+
+ private VM server;
+ private VM client;
+ private static final int KEY = 5;
+ private static final String VALUE = "value 5";
+ private static final String REGIONNAME = "region";
+
+ @Rule
+ public DistributedRestoreSystemProperties restoreSystemProperties =
+ new DistributedRestoreSystemProperties();
+
+ @Rule
+ public DistributedDiskDirRule distributedDiskDir = new DistributedDiskDirRule();
+
+ @Before
+ public void allowTransactions() {
+ server = VM.getVM(0);
+ client = VM.getVM(1);
+ server.invoke(() -> TXManagerImpl.ALLOW_PERSISTENT_TRANSACTIONS = true);
+ }
+
+ @After
+ public void disallowTransactions() {
+ server.invoke(() -> TXManagerImpl.ALLOW_PERSISTENT_TRANSACTIONS = false);
+
+ }
+
+ @Test
+ public void clientTransactionCanGetNotRecoveredEntryOnPersistentOverflowRegion()
+ throws Exception {
+ createServer(server, true, false);
+ putData(server);
+ server.invoke(() -> getCache().close());
+ int port = createServer(server, true, false);
+
+
+ client.invoke(() -> {
+ ClientCacheFactory factory = new ClientCacheFactory().addPoolServer("localhost", port);
+ ClientCache cache = getClientCache(factory);
+ cache.getCacheTransactionManager().begin();
+ try {
+ assertEquals(VALUE, cache.createClientRegionFactory(ClientRegionShortcut.PROXY)
+ .create(REGIONNAME).get(KEY));
+ } finally {
+ cache.getCacheTransactionManager().rollback();
+ }
+ });
+ }
+
+ private void putData(final VM server) {
+ server.invoke(() -> {
+ IntStream.range(0, 20)
+ .forEach(index -> getCache().getRegion(REGIONNAME).put(index, "value " + index));
+ });
+ }
+
+ private int createServer(final VM server, boolean isOverflow, boolean isAsyncDiskWrite) {
+ return server.invoke(() -> {
+ if (!isOverflow) {
+ System.setProperty(DiskStoreImpl.RECOVER_VALUE_PROPERTY_NAME, "false");
+ }
+ CacheFactory cacheFactory = new CacheFactory();
+ Cache cache = getCache(cacheFactory);
+ cache.createDiskStoreFactory().setQueueSize(3).setTimeInterval(10000).create("disk");
+ if (isOverflow) {
+ cache.createRegionFactory(RegionShortcut.REPLICATE_PERSISTENT)
+ .setDiskSynchronous(!isAsyncDiskWrite).setDiskStoreName("disk")
+ .setEvictionAttributes(
+ EvictionAttributes.createLRUEntryAttributes(1, EvictionAction.OVERFLOW_TO_DISK))
+ .create(REGIONNAME);
+ } else {
+ cache.createRegionFactory(RegionShortcut.REPLICATE_PERSISTENT).create(REGIONNAME);
+ }
+ CacheServer cacheServer = cache.addCacheServer();
+ cacheServer.start();
+ return cacheServer.getPort();
+ });
+ }
+
+ @Test
+ public void clientTransactionCanGetEvictedEntryOnPersistentOverflowRegion() throws Exception {
+ int port = createServer(server, true, false);
+ putData(server);
+ client.invoke(() -> {
+ ClientCacheFactory factory = new ClientCacheFactory().addPoolServer("localhost", port);
+ ClientCache cache = getClientCache(factory);
+ cache.getCacheTransactionManager().begin();
+ try {
+ assertEquals(VALUE, cache.createClientRegionFactory(ClientRegionShortcut.PROXY)
+ .create(REGIONNAME).get(KEY));
+ } finally {
+ cache.getCacheTransactionManager().rollback();
+ }
+ });
+ }
+
+ @Test
+ public void transactionCanGetEvictedEntryOnPersistentOverflowRegion() throws Exception {
+ createServer(server, true, false);
+ putData(server);
+ server.invoke(() -> {
+ LocalRegion region = (LocalRegion) getCache().getRegion(REGIONNAME);
+ Awaitility.await().atMost(10, SECONDS)
+ .until(() -> assertThat(region.getValueInVM(KEY)).isNull());
+ getCache().getCacheTransactionManager().begin();
+ try {
+ assertEquals(VALUE, region.get(KEY));
+ } finally {
+ cache.getCacheTransactionManager().rollback();
+ }
+ });
+ }
+
+ @Test
+ public void transactionCanGetNotRecoveredEntryOnPersistentOverflowRegion() throws Exception {
+ createServer(server, true, false);
+ putData(server);
+ server.invoke(() -> getCache().close());
+ createServer(server, true, false);
+ server.invoke(() -> {
+ LocalRegion region = (LocalRegion) getCache().getRegion("region");
+ getCache().getCacheTransactionManager().begin();
+ try {
+ assertEquals(VALUE, region.get(KEY));
+ } finally {
+ cache.getCacheTransactionManager().rollback();
+ }
+ });
+ }
+
+ @Test
+ public void transactionCanGetNotRecoveredEntryOnPersistentRegion() throws Exception {
+ createServer(server, false, false);
+ putData(server);
+ server.invoke(() -> getCache().close());
+ createServer(server, false, false);
+ server.invoke(() -> {
+ LocalRegion region = (LocalRegion) getCache().getRegion("region");
+ assertThat(region.getValueInVM(KEY)).isNull();
+ getCache().getCacheTransactionManager().begin();
+ try {
+ assertEquals(VALUE, region.get(KEY));
+ } finally {
+ cache.getCacheTransactionManager().rollback();
+ }
+ });
+ }
+
+ @Test
+ public void clientTransactionCanGetNotRecoveredEntryOnPersistentRegion() throws Exception {
+ createServer(server, false, false);
+ putData(server);
+ server.invoke(() -> getCache().close());
+ int port = createServer(server, false, false);
+
+
+ client.invoke(() -> {
+ ClientCacheFactory factory = new ClientCacheFactory().addPoolServer("localhost", port);
+ ClientCache cache = getClientCache(factory);
+ cache.getCacheTransactionManager().begin();
+ try {
+ assertEquals(VALUE, cache.createClientRegionFactory(ClientRegionShortcut.PROXY)
+ .create(REGIONNAME).get(KEY));
+ } finally {
+ cache.getCacheTransactionManager().rollback();
+ }
+ });
+ }
+
+ @Test
+ public void transactionCanUpdateEntryOnAsyncOverflowRegion() throws Exception {
+ createServer(server, true, true);
+ server.invoke(() -> {
+ Cache cache = getCache();
+ DiskStoreImpl diskStore = (DiskStoreImpl) cache.findDiskStore("disk");
+ LocalRegion region = (LocalRegion) cache.getRegion("region");
+ region.put(1, "value1");
+ region.put(2, "value2"); // causes key 1 to be evicted and sits in the async queue
+ TXManagerImpl txManager = getCache().getTxManager();
+ txManager.begin();
+ assertNotEquals(region.getValueInVM(1), Token.NOT_AVAILABLE);
+ region.put(1, "new value");
+ TransactionId txId = txManager.suspend();
+ region.put(3, "value3");
+ region.put(4, "value4");
+ diskStore.flush();
+ txManager.resume(txId);
+
+ txManager.commit();
+
+ assertEquals("new value", region.get(1));
+ });
+ }
+}
--
To stop receiving notification emails like this one, please contact
eshu11@apache.org.