You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ja...@apache.org on 2018/09/10 20:36:35 UTC

[geode] branch develop updated: GEODE-5565: Release off heap memory if unable to set value (#2305)

This is an automated email from the ASF dual-hosted git repository.

jasonhuynh pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new aaac1e5  GEODE-5565: Release off heap memory if unable to set value  (#2305)
aaac1e5 is described below

commit aaac1e5cefe6ad7ca533a7ca87eef9dd779912e4
Author: Jason Huynh <hu...@gmail.com>
AuthorDate: Mon Sep 10 13:36:28 2018 -0700

    GEODE-5565: Release off heap memory if unable to set value  (#2305)
    
    
      * Deallocate off heap memory for value if setValue fails
      * Check to see if entry is released before writing to disk
---
 .../geode/internal/cache/EntryEventImpl.java       |   6 +-
 .../cache/entries/AbstractDiskRegionEntry.java     |   8 +-
 .../geode/internal/cache/entries/DiskEntry.java    | 176 ++++++++++++---------
 .../internal/offheap/OffHeapRegionEntryHelper.java |   2 +-
 .../cache/entries/DiskEntryHelperTest.java         | 106 +++++++++++++
 5 files changed, 219 insertions(+), 79 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
index 5d010ca..063f896 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
@@ -1713,7 +1713,11 @@ public class EntryEventImpl implements InternalEntryEvent, InternalCacheEvent,
       success = true;
     } finally {
       if (!success && reentry instanceof OffHeapRegionEntry && v instanceof StoredObject) {
-        OffHeapRegionEntryHelper.releaseEntry((OffHeapRegionEntry) reentry, (StoredObject) v);
+        if (!calledSetValue) {
+          OffHeapHelper.release(v);
+        } else {
+          OffHeapRegionEntryHelper.releaseEntry((OffHeapRegionEntry) reentry, (StoredObject) v);
+        }
       }
     }
     if (logger.isTraceEnabled()) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractDiskRegionEntry.java b/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractDiskRegionEntry.java
index 34d734e..2f16544 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractDiskRegionEntry.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractDiskRegionEntry.java
@@ -21,6 +21,7 @@ import org.apache.geode.internal.cache.RegionClearedException;
 import org.apache.geode.internal.cache.RegionEntryContext;
 import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
 import org.apache.geode.internal.cache.wan.serial.SerialGatewaySenderQueue;
+import org.apache.geode.internal.offheap.annotations.Unretained;
 
 public abstract class AbstractDiskRegionEntry extends AbstractRegionEntry implements DiskEntry {
   protected AbstractDiskRegionEntry(RegionEntryContext context, Object value) {
@@ -28,12 +29,13 @@ public abstract class AbstractDiskRegionEntry extends AbstractRegionEntry implem
   }
 
   @Override
-  public void setValue(RegionEntryContext context, Object v) throws RegionClearedException {
+  public void setValue(RegionEntryContext context, @Unretained Object v)
+      throws RegionClearedException {
     setValue(context, v, null);
   }
 
   @Override
-  public void setValue(RegionEntryContext context, Object value, EntryEventImpl event)
+  public void setValue(RegionEntryContext context, @Unretained Object value, EntryEventImpl event)
       throws RegionClearedException {
     Helper.update(this, (LocalRegion) context, value, event);
     setRecentlyUsed(context); // fix for bug #42284 - entry just put into the cache is evicted
@@ -46,7 +48,7 @@ public abstract class AbstractDiskRegionEntry extends AbstractRegionEntry implem
    * @param value an entry value.
    */
   @Override
-  public void setValueWithContext(RegionEntryContext context, Object value) {
+  public void setValueWithContext(RegionEntryContext context, @Unretained Object value) {
     _setValue(value);
     releaseOffHeapRefIfRegionBeingClosedOrDestroyed(context, value);
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/entries/DiskEntry.java b/geode-core/src/main/java/org/apache/geode/internal/cache/entries/DiskEntry.java
index e55c0f7..6150a6c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/entries/DiskEntry.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/entries/DiskEntry.java
@@ -85,7 +85,7 @@ public interface DiskEntry extends RegionEntry {
    * @param context the value's context.
    * @param value an entry value.
    */
-  void setValueWithContext(RegionEntryContext context, Object value);
+  void setValueWithContext(RegionEntryContext context, @Unretained Object value);
 
   /**
    * In some cases we need to do something just before we drop the value from a DiskEntry that is
@@ -829,7 +829,7 @@ public interface DiskEntry extends RegionEntry {
       region.getDiskRegion().put(entry, region, vw, async);
     }
 
-    public static void update(DiskEntry entry, InternalRegion region, Object newValue)
+    public static void update(DiskEntry entry, InternalRegion region, @Unretained Object newValue)
         throws RegionClearedException {
       update(entry, region, newValue, null);
     }
@@ -838,40 +838,53 @@ public interface DiskEntry extends RegionEntry {
      * Updates the value of the disk entry with a new value. This allows us to free up disk space in
      * the non-backup case.
      */
-    public static void update(DiskEntry entry, InternalRegion region, Object newValue,
+    public static void update(DiskEntry entry, InternalRegion region, @Unretained Object newValue,
         EntryEventImpl event) throws RegionClearedException {
       if (newValue == null) {
         throw new NullPointerException(
             LocalizedStrings.DiskEntry_ENTRYS_VALUE_SHOULD_NOT_BE_NULL.toLocalizedString());
       }
-
-      AsyncDiskEntry asyncDiskEntry = null;
-      DiskRegion dr = region.getDiskRegion();
-      DiskId did = entry.getDiskId();
-      Object syncObj = did;
-      if (syncObj == null) {
-        syncObj = entry;
-      }
-      if (syncObj == did) {
-        dr.acquireReadLock();
-      }
+      boolean basicUpdateCalled = false;
       try {
-        synchronized (syncObj) {
-          asyncDiskEntry = basicUpdate(entry, region, newValue, event);
+
+        AsyncDiskEntry asyncDiskEntry = null;
+        DiskRegion dr = region.getDiskRegion();
+        DiskId did = entry.getDiskId();
+        Object syncObj = did;
+        if (syncObj == null) {
+          syncObj = entry;
         }
-      } finally {
         if (syncObj == did) {
-          dr.releaseReadLock();
+          dr.acquireReadLock();
+        }
+        try {
+          synchronized (syncObj) {
+            basicUpdateCalled = true;
+            asyncDiskEntry = basicUpdate(entry, region, newValue, event);
+          }
+        } finally {
+          if (syncObj == did) {
+            dr.releaseReadLock();
+          }
+        }
+        if (asyncDiskEntry != null && did.isPendingAsync()) {
+          // this needs to be done outside the above sync
+          scheduleAsyncWrite(asyncDiskEntry);
+        }
+      } finally {
+        if (!basicUpdateCalled) {
+          OffHeapHelper.release(newValue);
         }
-      }
-      if (asyncDiskEntry != null && did.isPendingAsync()) {
-        // this needs to be done outside the above sync
-        scheduleAsyncWrite(asyncDiskEntry);
       }
     }
 
+    static AsyncDiskEntry basicUpdateForTesting(DiskEntry entry, InternalRegion region,
+        @Unretained Object newValue, EntryEventImpl event) throws RegionClearedException {
+      return basicUpdate(entry, region, newValue, event);
+    }
+
     private static AsyncDiskEntry basicUpdate(DiskEntry entry, InternalRegion region,
-        Object newValue, EntryEventImpl event) throws RegionClearedException {
+        @Unretained Object newValue, EntryEventImpl event) throws RegionClearedException {
       AsyncDiskEntry result = null;
       DiskRegion dr = region.getDiskRegion();
       DiskId did = entry.getDiskId();
@@ -903,68 +916,83 @@ public interface DiskEntry extends RegionEntry {
       } else if (newValue instanceof RecoveredEntry) {
         ((RecoveredEntry) newValue).applyToDiskEntry(entry, region, dr, did);
       } else {
-        // The new value in the entry needs to be set after the disk writing
-        // has succeeded.
+        boolean newValueStoredInEntry = false;
+        try {
+          // The new value in the entry needs to be set after the disk writing
+          // has succeeded.
 
-        // entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
+          // entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
 
-        if (did != null && did.isPendingAsync()) {
-          // if the entry was not yet written to disk, we didn't update
-          // the bytes on disk.
-          oldValueLength = 0;
-        } else {
-          oldValueLength = getValueLength(did);
-        }
+          if (did != null && did.isPendingAsync()) {
+            // if the entry was not yet written to disk, we didn't update
+            // the bytes on disk.
+            oldValueLength = 0;
+          } else {
+            oldValueLength = getValueLength(did);
+          }
+
+          if (dr.isBackup()) {
+            dr.testIsRecoveredAndClear(did); // fixes bug 41409
+            if (doSynchronousWrite(region, dr)) {
+              if (AbstractRegionEntry.isCompressible(dr, newValue)) {
+                // In case of compression the value is being set first
+                // so that writeToDisk can get it back from the entry
+                // decompressed if it does not have it already in the event.
+                // TODO: this may have introduced a bug with clear since
+                // writeToDisk can throw RegionClearedException which
+                // was supposed to stop us from changing entry.
+                newValueStoredInEntry = true;
+                entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
+                // newValue is prepared and compressed. We can't write compressed values to disk.
+                if (!entry.isRemovedFromDisk()) {
+                  writeToDisk(entry, region, false, event);
+                }
+              } else {
+                writeBytesToDisk(entry, region, false, createValueWrapper(newValue, event));
+                newValueStoredInEntry = true;
+                entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
+              }
 
-        if (dr.isBackup()) {
-          dr.testIsRecoveredAndClear(did); // fixes bug 41409
-          if (doSynchronousWrite(region, dr)) {
-            if (AbstractRegionEntry.isCompressible(dr, newValue)) {
-              // In case of compression the value is being set first
-              // so that writeToDisk can get it back from the entry
-              // decompressed if it does not have it already in the event.
-              // TODO: this may have introduced a bug with clear since
-              // writeToDisk can throw RegionClearedException which
-              // was supposed to stop us from changing entry.
-              entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
-              // newValue is prepared and compressed. We can't write compressed values to disk.
-              writeToDisk(entry, region, false, event);
             } else {
-              writeBytesToDisk(entry, region, false, createValueWrapper(newValue, event));
+              // If we have concurrency checks enabled for a persistent region, we need
+              // to add an entry to the async queue for every update to maintain the RVV
+              boolean maintainRVV = region.getConcurrencyChecksEnabled();
+
+              if (!did.isPendingAsync() || maintainRVV) {
+                // if the entry is not async, we need to schedule it
+                // for regions with concurrency checks enabled, we add an entry
+                // to the queue for every entry.
+                did.setPendingAsync(true);
+                VersionTag tag = null;
+                VersionStamp stamp = entry.getVersionStamp();
+                if (stamp != null) {
+                  tag = stamp.asVersionTag();
+                }
+                result = new AsyncDiskEntry(region, entry, tag);
+              }
+              newValueStoredInEntry = true;
               entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
             }
+          } else if (did != null) {
+            newValueStoredInEntry = true;
+            entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
 
+            // Mark the id as needing to be written
+            // The disk remove that this section used to do caused bug 30961
+            // @todo this seems wrong. How does leaving it on disk fix the bug?
+            did.markForWriting();
+            // did.setValueSerializedSize(0);
           } else {
-            // If we have concurrency checks enabled for a persistent region, we need
-            // to add an entry to the async queue for every update to maintain the RVV
-            boolean maintainRVV = region.getConcurrencyChecksEnabled();
-
-            if (!did.isPendingAsync() || maintainRVV) {
-              // if the entry is not async, we need to schedule it
-              // for regions with concurrency checks enabled, we add an entry
-              // to the queue for every entry.
-              did.setPendingAsync(true);
-              VersionTag tag = null;
-              VersionStamp stamp = entry.getVersionStamp();
-              if (stamp != null) {
-                tag = stamp.asVersionTag();
-              }
-              result = new AsyncDiskEntry(region, entry, tag);
-            }
-            entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
+            newValueStoredInEntry = true;
+            entry.setValueWithContext(region, newValue);
+          }
+        } finally {
+          if (!newValueStoredInEntry) {
+            OffHeapHelper.release(newValue);
           }
-        } else if (did != null) {
-          entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
-
-          // Mark the id as needing to be written
-          // The disk remove that this section used to do caused bug 30961
-          // @todo this seems wrong. How does leaving it on disk fix the bug?
-          did.markForWriting();
-          // did.setValueSerializedSize(0);
-        } else {
-          entry.setValueWithContext(region, newValue);
         }
 
+
         if (Token.isInvalidOrRemoved(newValue)) {
           if (oldValue == null) {
             updateStats(dr, region, 0/* InVM */, -1/* OnDisk */, -oldValueLength);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/offheap/OffHeapRegionEntryHelper.java b/geode-core/src/main/java/org/apache/geode/internal/offheap/OffHeapRegionEntryHelper.java
index f203bed..68e0391 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/offheap/OffHeapRegionEntryHelper.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/offheap/OffHeapRegionEntryHelper.java
@@ -191,7 +191,7 @@ public class OffHeapRegionEntryHelper {
       @Released StoredObject expectedValue) {
     long oldAddress = objectToAddress(expectedValue);
     final long newAddress = objectToAddress(Token.REMOVED_PHASE2);
-    if (re.setAddress(oldAddress, newAddress) || re.getAddress() != newAddress) {
+    if (re.setAddress(oldAddress, newAddress)) {
       releaseAddress(oldAddress);
     } /*
        * else { if (!calledSetValue || re.getAddress() != newAddress) { expectedValue.release(); } }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/entries/DiskEntryHelperTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/entries/DiskEntryHelperTest.java
index 3759afd..731a118 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/entries/DiskEntryHelperTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/entries/DiskEntryHelperTest.java
@@ -15,13 +15,26 @@
 package org.apache.geode.internal.cache.entries;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
 
+import org.apache.geode.cache.RegionDestroyedException;
+import org.apache.geode.internal.cache.DiskId;
 import org.apache.geode.internal.cache.DiskRegion;
+import org.apache.geode.internal.cache.EntryEventImpl;
 import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.offheap.StoredObject;
 
 public class DiskEntryHelperTest {
 
@@ -74,4 +87,97 @@ public class DiskEntryHelperTest {
     assertThat(result).isFalse();
   }
 
+  @Test
+  public void whenHelperUpdateCalledAndDiskRegionAcquireReadLockThrowsRegionDestroyedExceptionThenStoredObjectShouldBeReleased()
+      throws Exception {
+    LocalRegion lr = mock(LocalRegion.class);
+    DiskEntry diskEntry = mock(DiskEntry.class);
+    when(diskEntry.getDiskId()).thenReturn(mock(DiskId.class));
+    EntryEventImpl entryEvent = mock(EntryEventImpl.class);
+    DiskRegion diskRegion = mock(DiskRegion.class);
+    when(lr.getDiskRegion()).thenReturn(diskRegion);
+    Mockito.doThrow(new RegionDestroyedException("Region Destroyed", "mocked region"))
+        .when(diskRegion).acquireReadLock();
+    StoredObject storedObject = mock(StoredObject.class);
+    try {
+      DiskEntry.Helper.update(diskEntry, lr, storedObject, entryEvent);
+      fail();
+    } catch (RegionDestroyedException rde) {
+      verify(storedObject, times(1)).release();
+    }
+  }
+
+
+  @Test
+  public void whenBasicUpdateWithDiskRegionBackupAndEntryNotSetThenReleaseOnStoredObjectShouldBeCalled()
+      throws Exception {
+    StoredObject storedObject = mock(StoredObject.class);
+    LocalRegion lr = mock(LocalRegion.class);
+    DiskEntry diskEntry = mock(DiskEntry.class);
+    when(diskEntry.getDiskId()).thenReturn(mock(DiskId.class));
+    EntryEventImpl entryEvent = mock(EntryEventImpl.class);
+    DiskRegion diskRegion = mock(DiskRegion.class);
+    when(diskRegion.isBackup()).thenReturn(true);
+    doThrow(new RegionDestroyedException("", "")).when(diskRegion).put(eq(diskEntry), eq(lr),
+        ArgumentMatchers.any(DiskEntry.Helper.ValueWrapper.class), anyBoolean());
+    when(lr.getDiskRegion()).thenReturn(diskRegion);
+    try {
+      DiskEntry.Helper.basicUpdateForTesting(diskEntry, lr, storedObject, entryEvent);
+      fail();
+    } catch (RegionDestroyedException rde) {
+      verify(storedObject, times(1)).release();
+    }
+  }
+
+  @Test
+  public void whenBasicUpdateWithDiskRegionBackupAndAsyncWritesAndEntryNotSetThenReleaseOnStoredObjectShouldBeCalled()
+      throws Exception {
+    StoredObject storedObject = mock(StoredObject.class);
+    LocalRegion lr = mock(LocalRegion.class);
+    DiskEntry diskEntry = mock(DiskEntry.class);
+    when(diskEntry.getDiskId()).thenReturn(mock(DiskId.class));
+    EntryEventImpl entryEvent = mock(EntryEventImpl.class);
+    DiskRegion diskRegion = mock(DiskRegion.class);
+    when(diskRegion.isBackup()).thenReturn(true);
+    doThrow(new RegionDestroyedException("", "")).when(diskRegion).put(eq(diskEntry), eq(lr),
+        ArgumentMatchers.any(DiskEntry.Helper.ValueWrapper.class), anyBoolean());
+    when(lr.getDiskRegion()).thenReturn(diskRegion);
+
+    when(diskRegion.isSync()).thenReturn(false);
+    when(lr.isInitialized()).thenReturn(true);
+    when(lr.getConcurrencyChecksEnabled()).thenThrow(new RegionDestroyedException("", ""));
+    try {
+      DiskEntry.Helper.basicUpdateForTesting(diskEntry, lr, storedObject, entryEvent);
+      fail();
+    } catch (RegionDestroyedException rde) {
+      verify(storedObject, times(1)).release();
+    }
+  }
+
+  @Test
+  public void whenBasicUpdateButNotBackupAndEntrySet() throws Exception {
+    StoredObject storedObject = mock(StoredObject.class);
+    LocalRegion lr = mock(LocalRegion.class);
+    DiskEntry diskEntry = mock(DiskEntry.class);
+    when(diskEntry.getDiskId()).thenReturn(mock(DiskId.class));
+    EntryEventImpl entryEvent = mock(EntryEventImpl.class);
+    DiskRegion diskRegion = mock(DiskRegion.class);
+    when(diskRegion.isBackup()).thenReturn(false);
+    when(lr.getDiskRegion()).thenReturn(diskRegion);
+    DiskEntry.Helper.basicUpdateForTesting(diskEntry, lr, storedObject, entryEvent);
+    verify(storedObject, times(0)).release();
+  }
+
+  @Test
+  public void whenBasicUpdateButNotBackupAndDiskIdIsNullAndEntrySet() throws Exception {
+    StoredObject storedObject = mock(StoredObject.class);
+    LocalRegion lr = mock(LocalRegion.class);
+    DiskEntry diskEntry = mock(DiskEntry.class);
+    EntryEventImpl entryEvent = mock(EntryEventImpl.class);
+    DiskRegion diskRegion = mock(DiskRegion.class);
+    when(diskRegion.isBackup()).thenReturn(false);
+    when(lr.getDiskRegion()).thenReturn(diskRegion);
+    DiskEntry.Helper.basicUpdateForTesting(diskEntry, lr, storedObject, entryEvent);
+    verify(storedObject, times(0)).release();
+  }
 }