You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2018/05/11 23:02:35 UTC

[geode] branch develop updated: Feature/geode 5173 1 (#1948)

This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 52bdb96  Feature/geode 5173 1 (#1948)
52bdb96 is described below

commit 52bdb96e7c073ed623f4f53e1e6083f849bf2863
Author: pivotal-eshu <es...@pivotal.io>
AuthorDate: Fri May 11 16:02:30 2018 -0700

    Feature/geode 5173 1 (#1948)
    
    * GEODE-5173: Transaction will fault in value if value is Token.NOT_AVAILABLE or isEvicted.
---
 .../geode/internal/cache/InternalRegion.java       |   2 +-
 .../apache/geode/internal/cache/LocalRegion.java   |  22 +-
 .../geode/internal/cache/NonLocalRegionEntry.java  |   5 +
 .../org/apache/geode/internal/cache/Oplog.java     |   4 +
 .../geode/internal/cache/ProxyRegionMap.java       |   5 +
 .../apache/geode/internal/cache/RegionEntry.java   |   2 +
 .../geode/internal/cache/ValidatingDiskRegion.java |   5 +
 .../cache/entries/AbstractRegionEntry.java         |   5 +
 .../PersistentRegionTransactionDUnitTest.java      | 242 +++++++++++++++++++++
 9 files changed, 278 insertions(+), 14 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
index 1a311a4..91a4a52 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
@@ -314,7 +314,7 @@ public interface InternalRegion extends Region, HasCachePerfStats, RegionEntryCo
 
   void setInUseByTransaction(boolean b);
 
-  void txLRUStart();
+  boolean txLRUStart();
 
   void txLRUEnd();
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 6f1c6b4..ec65991 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -8336,19 +8336,15 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
       try {
         synchronized (regionEntry) {
           if (!regionEntry.isRemoved()) {
-            if (regionEntry instanceof DiskEntry && regionEntry instanceof EvictableEntry) {
-              EvictableEntry le = (EvictableEntry) regionEntry;
-              if (le.isEvicted()) {
-                // Handle the case where we fault in a disk entry
-                txLRUStart();
-                needsLRUCleanup = true;
-
-                // Fault in the value from disk
-                regionEntry.getValue(this);
-              }
+            Object value = regionEntry.getValueInVM(this);
+            if (value == Token.NOT_AVAILABLE || regionEntry.isEvicted()) {
+              // Entry value is on disk
+              // Handle the case where we fault in a evicted disk entry
+              needsLRUCleanup = txLRUStart();
+              // Fault in the value from disk
+              value = regionEntry.getValue(this);
             }
 
-            Object value = regionEntry.getValueInVM(this);
             /*
              * The tx will need the raw value for identity comparison. Please see
              * TXEntryState#checkForConflict(LocalRegion,Object)
@@ -8448,8 +8444,8 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
   }
 
   @Override
-  public void txLRUStart() {
-    this.entries.disableLruUpdateCallback();
+  public boolean txLRUStart() {
+    return this.entries.disableLruUpdateCallback();
   }
 
   @Override
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/NonLocalRegionEntry.java b/geode-core/src/main/java/org/apache/geode/internal/cache/NonLocalRegionEntry.java
index 1d96131..6fd558d 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/NonLocalRegionEntry.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/NonLocalRegionEntry.java
@@ -608,4 +608,9 @@ public class NonLocalRegionEntry implements RegionEntry, VersionStamp {
       boolean isEntryUpdate) {
     throw new IllegalStateException("Should never be called");
   }
+
+  @Override
+  public boolean isEvicted() {
+    return false;
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java b/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
index 93d856d..2ff97d5 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
@@ -7285,6 +7285,10 @@ public class Oplog implements CompactableOplog, Flushable {
       throw new IllegalStateException("Should never be called");
     }
 
+    @Override
+    public boolean isEvicted() {
+      return false;
+    }
   }
 
   /**
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyRegionMap.java
index bd3059d..26d95a6 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyRegionMap.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyRegionMap.java
@@ -785,6 +785,11 @@ class ProxyRegionMap implements RegionMap {
         EntryEventImpl event, boolean isEntryUpdate) {
       throw new IllegalStateException("Should never be called");
     }
+
+    @Override
+    public boolean isEvicted() {
+      return false;
+    }
   }
 
   @Override
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEntry.java b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEntry.java
index c16ad23..24cb359 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEntry.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEntry.java
@@ -461,4 +461,6 @@ public interface RegionEntry {
   @Retained(ABSTRACT_REGION_ENTRY_PREPARE_VALUE_FOR_CACHE)
   Object prepareValueForCache(RegionEntryContext context, Object value, EntryEventImpl event,
       boolean isEntryUpdate);
+
+  boolean isEvicted();
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ValidatingDiskRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ValidatingDiskRegion.java
index cc0908a..0823c7d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/ValidatingDiskRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ValidatingDiskRegion.java
@@ -532,6 +532,11 @@ public class ValidatingDiskRegion extends DiskRegion implements DiskRecoveryStor
         EntryEventImpl event, boolean isEntryUpdate) {
       throw new IllegalStateException("Should never be called");
     }
+
+    @Override
+    public boolean isEvicted() {
+      return false;
+    }
   }
 
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractRegionEntry.java b/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractRegionEntry.java
index 7f8b6d4..9c08bf0 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractRegionEntry.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractRegionEntry.java
@@ -2235,4 +2235,9 @@ public abstract class AbstractRegionEntry implements HashRegionEntry<Object, Obj
   public void returnToPool() {
     // noop by default
   }
+
+  @Override
+  public boolean isEvicted() {
+    return false;
+  }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PersistentRegionTransactionDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PersistentRegionTransactionDUnitTest.java
new file mode 100644
index 0000000..8e7785b
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PersistentRegionTransactionDUnitTest.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+import java.util.stream.IntStream;
+
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.EvictionAction;
+import org.apache.geode.cache.EvictionAttributes;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.TransactionId;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
+import org.apache.geode.test.dunit.rules.DistributedDiskDirRule;
+import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
+import org.apache.geode.test.junit.categories.DistributedTest;
+
+@Category(DistributedTest.class)
+public class PersistentRegionTransactionDUnitTest extends JUnit4CacheTestCase {
+
+  private VM server;
+  private VM client;
+  private static final int KEY = 5;
+  private static final String VALUE = "value 5";
+  private static final String REGIONNAME = "region";
+
+  @Rule
+  public DistributedRestoreSystemProperties restoreSystemProperties =
+      new DistributedRestoreSystemProperties();
+
+  @Rule
+  public DistributedDiskDirRule distributedDiskDir = new DistributedDiskDirRule();
+
+  @Before
+  public void allowTransactions() {
+    server = VM.getVM(0);
+    client = VM.getVM(1);
+    server.invoke(() -> TXManagerImpl.ALLOW_PERSISTENT_TRANSACTIONS = true);
+  }
+
+  @After
+  public void disallowTransactions() {
+    server.invoke(() -> TXManagerImpl.ALLOW_PERSISTENT_TRANSACTIONS = false);
+
+  }
+
+  @Test
+  public void clientTransactionCanGetNotRecoveredEntryOnPersistentOverflowRegion()
+      throws Exception {
+    createServer(server, true, false);
+    putData(server);
+    server.invoke(() -> getCache().close());
+    int port = createServer(server, true, false);
+
+
+    client.invoke(() -> {
+      ClientCacheFactory factory = new ClientCacheFactory().addPoolServer("localhost", port);
+      ClientCache cache = getClientCache(factory);
+      cache.getCacheTransactionManager().begin();
+      try {
+        assertEquals(VALUE, cache.createClientRegionFactory(ClientRegionShortcut.PROXY)
+            .create(REGIONNAME).get(KEY));
+      } finally {
+        cache.getCacheTransactionManager().rollback();
+      }
+    });
+  }
+
+  private void putData(final VM server) {
+    server.invoke(() -> {
+      IntStream.range(0, 20)
+          .forEach(index -> getCache().getRegion(REGIONNAME).put(index, "value " + index));
+    });
+  }
+
+  private int createServer(final VM server, boolean isOverflow, boolean isAsyncDiskWrite) {
+    return server.invoke(() -> {
+      if (!isOverflow) {
+        System.setProperty(DiskStoreImpl.RECOVER_VALUE_PROPERTY_NAME, "false");
+      }
+      CacheFactory cacheFactory = new CacheFactory();
+      Cache cache = getCache(cacheFactory);
+      cache.createDiskStoreFactory().setQueueSize(3).setTimeInterval(10000).create("disk");
+      if (isOverflow) {
+        cache.createRegionFactory(RegionShortcut.REPLICATE_PERSISTENT)
+            .setDiskSynchronous(!isAsyncDiskWrite).setDiskStoreName("disk")
+            .setEvictionAttributes(
+                EvictionAttributes.createLRUEntryAttributes(1, EvictionAction.OVERFLOW_TO_DISK))
+            .create(REGIONNAME);
+      } else {
+        cache.createRegionFactory(RegionShortcut.REPLICATE_PERSISTENT).create(REGIONNAME);
+      }
+      CacheServer cacheServer = cache.addCacheServer();
+      cacheServer.start();
+      return cacheServer.getPort();
+    });
+  }
+
+  @Test
+  public void clientTransactionCanGetEvictedEntryOnPersistentOverflowRegion() throws Exception {
+    int port = createServer(server, true, false);
+    putData(server);
+    client.invoke(() -> {
+      ClientCacheFactory factory = new ClientCacheFactory().addPoolServer("localhost", port);
+      ClientCache cache = getClientCache(factory);
+      cache.getCacheTransactionManager().begin();
+      try {
+        assertEquals(VALUE, cache.createClientRegionFactory(ClientRegionShortcut.PROXY)
+            .create(REGIONNAME).get(KEY));
+      } finally {
+        cache.getCacheTransactionManager().rollback();
+      }
+    });
+  }
+
+  @Test
+  public void transactionCanGetEvictedEntryOnPersistentOverflowRegion() throws Exception {
+    createServer(server, true, false);
+    putData(server);
+    server.invoke(() -> {
+      LocalRegion region = (LocalRegion) getCache().getRegion(REGIONNAME);
+      Awaitility.await().atMost(10, SECONDS)
+          .until(() -> assertThat(region.getValueInVM(KEY)).isNull());
+      getCache().getCacheTransactionManager().begin();
+      try {
+        assertEquals(VALUE, region.get(KEY));
+      } finally {
+        cache.getCacheTransactionManager().rollback();
+      }
+    });
+  }
+
+  @Test
+  public void transactionCanGetNotRecoveredEntryOnPersistentOverflowRegion() throws Exception {
+    createServer(server, true, false);
+    putData(server);
+    server.invoke(() -> getCache().close());
+    createServer(server, true, false);
+    server.invoke(() -> {
+      LocalRegion region = (LocalRegion) getCache().getRegion("region");
+      getCache().getCacheTransactionManager().begin();
+      try {
+        assertEquals(VALUE, region.get(KEY));
+      } finally {
+        cache.getCacheTransactionManager().rollback();
+      }
+    });
+  }
+
+  @Test
+  public void transactionCanGetNotRecoveredEntryOnPersistentRegion() throws Exception {
+    createServer(server, false, false);
+    putData(server);
+    server.invoke(() -> getCache().close());
+    createServer(server, false, false);
+    server.invoke(() -> {
+      LocalRegion region = (LocalRegion) getCache().getRegion("region");
+      assertThat(region.getValueInVM(KEY)).isNull();
+      getCache().getCacheTransactionManager().begin();
+      try {
+        assertEquals(VALUE, region.get(KEY));
+      } finally {
+        cache.getCacheTransactionManager().rollback();
+      }
+    });
+  }
+
+  @Test
+  public void clientTransactionCanGetNotRecoveredEntryOnPersistentRegion() throws Exception {
+    createServer(server, false, false);
+    putData(server);
+    server.invoke(() -> getCache().close());
+    int port = createServer(server, false, false);
+
+
+    client.invoke(() -> {
+      ClientCacheFactory factory = new ClientCacheFactory().addPoolServer("localhost", port);
+      ClientCache cache = getClientCache(factory);
+      cache.getCacheTransactionManager().begin();
+      try {
+        assertEquals(VALUE, cache.createClientRegionFactory(ClientRegionShortcut.PROXY)
+            .create(REGIONNAME).get(KEY));
+      } finally {
+        cache.getCacheTransactionManager().rollback();
+      }
+    });
+  }
+
+  @Test
+  public void transactionCanUpdateEntryOnAsyncOverflowRegion() throws Exception {
+    createServer(server, true, true);
+    server.invoke(() -> {
+      Cache cache = getCache();
+      DiskStoreImpl diskStore = (DiskStoreImpl) cache.findDiskStore("disk");
+      LocalRegion region = (LocalRegion) cache.getRegion("region");
+      region.put(1, "value1");
+      region.put(2, "value2"); // causes key 1 to be evicted and sits in the async queue
+      TXManagerImpl txManager = getCache().getTxManager();
+      txManager.begin();
+      assertNotEquals(region.getValueInVM(1), Token.NOT_AVAILABLE);
+      region.put(1, "new value");
+      TransactionId txId = txManager.suspend();
+      region.put(3, "value3");
+      region.put(4, "value4");
+      diskStore.flush();
+      txManager.resume(txId);
+
+      txManager.commit();
+
+      assertEquals("new value", region.get(1));
+    });
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
eshu11@apache.org.