You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ja...@apache.org on 2018/09/10 20:36:35 UTC
[geode] branch develop updated: GEODE-5565: Release off heap memory
if unable to set value (#2305)
This is an automated email from the ASF dual-hosted git repository.
jasonhuynh pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new aaac1e5 GEODE-5565: Release off heap memory if unable to set value (#2305)
aaac1e5 is described below
commit aaac1e5cefe6ad7ca533a7ca87eef9dd779912e4
Author: Jason Huynh <hu...@gmail.com>
AuthorDate: Mon Sep 10 13:36:28 2018 -0700
GEODE-5565: Release off heap memory if unable to set value (#2305)
* Deallocate off heap memory for value if setValue fails
* Check to see if entry is released before writing to disk
---
.../geode/internal/cache/EntryEventImpl.java | 6 +-
.../cache/entries/AbstractDiskRegionEntry.java | 8 +-
.../geode/internal/cache/entries/DiskEntry.java | 176 ++++++++++++---------
.../internal/offheap/OffHeapRegionEntryHelper.java | 2 +-
.../cache/entries/DiskEntryHelperTest.java | 106 +++++++++++++
5 files changed, 219 insertions(+), 79 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
index 5d010ca..063f896 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
@@ -1713,7 +1713,11 @@ public class EntryEventImpl implements InternalEntryEvent, InternalCacheEvent,
success = true;
} finally {
if (!success && reentry instanceof OffHeapRegionEntry && v instanceof StoredObject) {
- OffHeapRegionEntryHelper.releaseEntry((OffHeapRegionEntry) reentry, (StoredObject) v);
+ if (!calledSetValue) {
+ OffHeapHelper.release(v);
+ } else {
+ OffHeapRegionEntryHelper.releaseEntry((OffHeapRegionEntry) reentry, (StoredObject) v);
+ }
}
}
if (logger.isTraceEnabled()) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractDiskRegionEntry.java b/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractDiskRegionEntry.java
index 34d734e..2f16544 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractDiskRegionEntry.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractDiskRegionEntry.java
@@ -21,6 +21,7 @@ import org.apache.geode.internal.cache.RegionClearedException;
import org.apache.geode.internal.cache.RegionEntryContext;
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
import org.apache.geode.internal.cache.wan.serial.SerialGatewaySenderQueue;
+import org.apache.geode.internal.offheap.annotations.Unretained;
public abstract class AbstractDiskRegionEntry extends AbstractRegionEntry implements DiskEntry {
protected AbstractDiskRegionEntry(RegionEntryContext context, Object value) {
@@ -28,12 +29,13 @@ public abstract class AbstractDiskRegionEntry extends AbstractRegionEntry implem
}
@Override
- public void setValue(RegionEntryContext context, Object v) throws RegionClearedException {
+ public void setValue(RegionEntryContext context, @Unretained Object v)
+ throws RegionClearedException {
setValue(context, v, null);
}
@Override
- public void setValue(RegionEntryContext context, Object value, EntryEventImpl event)
+ public void setValue(RegionEntryContext context, @Unretained Object value, EntryEventImpl event)
throws RegionClearedException {
Helper.update(this, (LocalRegion) context, value, event);
setRecentlyUsed(context); // fix for bug #42284 - entry just put into the cache is evicted
@@ -46,7 +48,7 @@ public abstract class AbstractDiskRegionEntry extends AbstractRegionEntry implem
* @param value an entry value.
*/
@Override
- public void setValueWithContext(RegionEntryContext context, Object value) {
+ public void setValueWithContext(RegionEntryContext context, @Unretained Object value) {
_setValue(value);
releaseOffHeapRefIfRegionBeingClosedOrDestroyed(context, value);
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/entries/DiskEntry.java b/geode-core/src/main/java/org/apache/geode/internal/cache/entries/DiskEntry.java
index e55c0f7..6150a6c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/entries/DiskEntry.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/entries/DiskEntry.java
@@ -85,7 +85,7 @@ public interface DiskEntry extends RegionEntry {
* @param context the value's context.
* @param value an entry value.
*/
- void setValueWithContext(RegionEntryContext context, Object value);
+ void setValueWithContext(RegionEntryContext context, @Unretained Object value);
/**
* In some cases we need to do something just before we drop the value from a DiskEntry that is
@@ -829,7 +829,7 @@ public interface DiskEntry extends RegionEntry {
region.getDiskRegion().put(entry, region, vw, async);
}
- public static void update(DiskEntry entry, InternalRegion region, Object newValue)
+ public static void update(DiskEntry entry, InternalRegion region, @Unretained Object newValue)
throws RegionClearedException {
update(entry, region, newValue, null);
}
@@ -838,40 +838,53 @@ public interface DiskEntry extends RegionEntry {
* Updates the value of the disk entry with a new value. This allows us to free up disk space in
* the non-backup case.
*/
- public static void update(DiskEntry entry, InternalRegion region, Object newValue,
+ public static void update(DiskEntry entry, InternalRegion region, @Unretained Object newValue,
EntryEventImpl event) throws RegionClearedException {
if (newValue == null) {
throw new NullPointerException(
LocalizedStrings.DiskEntry_ENTRYS_VALUE_SHOULD_NOT_BE_NULL.toLocalizedString());
}
-
- AsyncDiskEntry asyncDiskEntry = null;
- DiskRegion dr = region.getDiskRegion();
- DiskId did = entry.getDiskId();
- Object syncObj = did;
- if (syncObj == null) {
- syncObj = entry;
- }
- if (syncObj == did) {
- dr.acquireReadLock();
- }
+ boolean basicUpdateCalled = false;
try {
- synchronized (syncObj) {
- asyncDiskEntry = basicUpdate(entry, region, newValue, event);
+
+ AsyncDiskEntry asyncDiskEntry = null;
+ DiskRegion dr = region.getDiskRegion();
+ DiskId did = entry.getDiskId();
+ Object syncObj = did;
+ if (syncObj == null) {
+ syncObj = entry;
}
- } finally {
if (syncObj == did) {
- dr.releaseReadLock();
+ dr.acquireReadLock();
+ }
+ try {
+ synchronized (syncObj) {
+ basicUpdateCalled = true;
+ asyncDiskEntry = basicUpdate(entry, region, newValue, event);
+ }
+ } finally {
+ if (syncObj == did) {
+ dr.releaseReadLock();
+ }
+ }
+ if (asyncDiskEntry != null && did.isPendingAsync()) {
+ // this needs to be done outside the above sync
+ scheduleAsyncWrite(asyncDiskEntry);
+ }
+ } finally {
+ if (!basicUpdateCalled) {
+ OffHeapHelper.release(newValue);
}
- }
- if (asyncDiskEntry != null && did.isPendingAsync()) {
- // this needs to be done outside the above sync
- scheduleAsyncWrite(asyncDiskEntry);
}
}
+ static AsyncDiskEntry basicUpdateForTesting(DiskEntry entry, InternalRegion region,
+ @Unretained Object newValue, EntryEventImpl event) throws RegionClearedException {
+ return basicUpdate(entry, region, newValue, event);
+ }
+
private static AsyncDiskEntry basicUpdate(DiskEntry entry, InternalRegion region,
- Object newValue, EntryEventImpl event) throws RegionClearedException {
+ @Unretained Object newValue, EntryEventImpl event) throws RegionClearedException {
AsyncDiskEntry result = null;
DiskRegion dr = region.getDiskRegion();
DiskId did = entry.getDiskId();
@@ -903,68 +916,83 @@ public interface DiskEntry extends RegionEntry {
} else if (newValue instanceof RecoveredEntry) {
((RecoveredEntry) newValue).applyToDiskEntry(entry, region, dr, did);
} else {
- // The new value in the entry needs to be set after the disk writing
- // has succeeded.
+ boolean newValueStoredInEntry = false;
+ try {
+ // The new value in the entry needs to be set after the disk writing
+ // has succeeded.
- // entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
+ // entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
- if (did != null && did.isPendingAsync()) {
- // if the entry was not yet written to disk, we didn't update
- // the bytes on disk.
- oldValueLength = 0;
- } else {
- oldValueLength = getValueLength(did);
- }
+ if (did != null && did.isPendingAsync()) {
+ // if the entry was not yet written to disk, we didn't update
+ // the bytes on disk.
+ oldValueLength = 0;
+ } else {
+ oldValueLength = getValueLength(did);
+ }
+
+ if (dr.isBackup()) {
+ dr.testIsRecoveredAndClear(did); // fixes bug 41409
+ if (doSynchronousWrite(region, dr)) {
+ if (AbstractRegionEntry.isCompressible(dr, newValue)) {
+ // In case of compression the value is being set first
+ // so that writeToDisk can get it back from the entry
+ // decompressed if it does not have it already in the event.
+ // TODO: this may have introduced a bug with clear since
+ // writeToDisk can throw RegionClearedException which
+ // was supposed to stop us from changing entry.
+ newValueStoredInEntry = true;
+ entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
+ // newValue is prepared and compressed. We can't write compressed values to disk.
+ if (!entry.isRemovedFromDisk()) {
+ writeToDisk(entry, region, false, event);
+ }
+ } else {
+ writeBytesToDisk(entry, region, false, createValueWrapper(newValue, event));
+ newValueStoredInEntry = true;
+ entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
+ }
- if (dr.isBackup()) {
- dr.testIsRecoveredAndClear(did); // fixes bug 41409
- if (doSynchronousWrite(region, dr)) {
- if (AbstractRegionEntry.isCompressible(dr, newValue)) {
- // In case of compression the value is being set first
- // so that writeToDisk can get it back from the entry
- // decompressed if it does not have it already in the event.
- // TODO: this may have introduced a bug with clear since
- // writeToDisk can throw RegionClearedException which
- // was supposed to stop us from changing entry.
- entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
- // newValue is prepared and compressed. We can't write compressed values to disk.
- writeToDisk(entry, region, false, event);
} else {
- writeBytesToDisk(entry, region, false, createValueWrapper(newValue, event));
+ // If we have concurrency checks enabled for a persistent region, we need
+ // to add an entry to the async queue for every update to maintain the RVV
+ boolean maintainRVV = region.getConcurrencyChecksEnabled();
+
+ if (!did.isPendingAsync() || maintainRVV) {
+ // if the entry is not async, we need to schedule it
+ // for regions with concurrency checks enabled, we add an entry
+ // to the queue for every entry.
+ did.setPendingAsync(true);
+ VersionTag tag = null;
+ VersionStamp stamp = entry.getVersionStamp();
+ if (stamp != null) {
+ tag = stamp.asVersionTag();
+ }
+ result = new AsyncDiskEntry(region, entry, tag);
+ }
+ newValueStoredInEntry = true;
entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
}
+ } else if (did != null) {
+ newValueStoredInEntry = true;
+ entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
+ // Mark the id as needing to be written
+ // The disk remove that this section used to do caused bug 30961
+ // @todo this seems wrong. How does leaving it on disk fix the bug?
+ did.markForWriting();
+ // did.setValueSerializedSize(0);
} else {
- // If we have concurrency checks enabled for a persistent region, we need
- // to add an entry to the async queue for every update to maintain the RVV
- boolean maintainRVV = region.getConcurrencyChecksEnabled();
-
- if (!did.isPendingAsync() || maintainRVV) {
- // if the entry is not async, we need to schedule it
- // for regions with concurrency checks enabled, we add an entry
- // to the queue for every entry.
- did.setPendingAsync(true);
- VersionTag tag = null;
- VersionStamp stamp = entry.getVersionStamp();
- if (stamp != null) {
- tag = stamp.asVersionTag();
- }
- result = new AsyncDiskEntry(region, entry, tag);
- }
- entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
+ newValueStoredInEntry = true;
+ entry.setValueWithContext(region, newValue);
+ }
+ } finally {
+ if (!newValueStoredInEntry) {
+ OffHeapHelper.release(newValue);
}
- } else if (did != null) {
- entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
-
- // Mark the id as needing to be written
- // The disk remove that this section used to do caused bug 30961
- // @todo this seems wrong. How does leaving it on disk fix the bug?
- did.markForWriting();
- // did.setValueSerializedSize(0);
- } else {
- entry.setValueWithContext(region, newValue);
}
+
if (Token.isInvalidOrRemoved(newValue)) {
if (oldValue == null) {
updateStats(dr, region, 0/* InVM */, -1/* OnDisk */, -oldValueLength);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/offheap/OffHeapRegionEntryHelper.java b/geode-core/src/main/java/org/apache/geode/internal/offheap/OffHeapRegionEntryHelper.java
index f203bed..68e0391 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/offheap/OffHeapRegionEntryHelper.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/offheap/OffHeapRegionEntryHelper.java
@@ -191,7 +191,7 @@ public class OffHeapRegionEntryHelper {
@Released StoredObject expectedValue) {
long oldAddress = objectToAddress(expectedValue);
final long newAddress = objectToAddress(Token.REMOVED_PHASE2);
- if (re.setAddress(oldAddress, newAddress) || re.getAddress() != newAddress) {
+ if (re.setAddress(oldAddress, newAddress)) {
releaseAddress(oldAddress);
} /*
* else { if (!calledSetValue || re.getAddress() != newAddress) { expectedValue.release(); } }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/entries/DiskEntryHelperTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/entries/DiskEntryHelperTest.java
index 3759afd..731a118 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/entries/DiskEntryHelperTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/entries/DiskEntryHelperTest.java
@@ -15,13 +15,26 @@
package org.apache.geode.internal.cache.entries;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+import org.apache.geode.cache.RegionDestroyedException;
+import org.apache.geode.internal.cache.DiskId;
import org.apache.geode.internal.cache.DiskRegion;
+import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.offheap.StoredObject;
public class DiskEntryHelperTest {
@@ -74,4 +87,97 @@ public class DiskEntryHelperTest {
assertThat(result).isFalse();
}
+ @Test
+ public void whenHelperUpdateCalledAndDiskRegionAcquireReadLockThrowsRegionDestroyedExceptionThenStoredObjectShouldBeReleased()
+ throws Exception {
+ LocalRegion lr = mock(LocalRegion.class);
+ DiskEntry diskEntry = mock(DiskEntry.class);
+ when(diskEntry.getDiskId()).thenReturn(mock(DiskId.class));
+ EntryEventImpl entryEvent = mock(EntryEventImpl.class);
+ DiskRegion diskRegion = mock(DiskRegion.class);
+ when(lr.getDiskRegion()).thenReturn(diskRegion);
+ Mockito.doThrow(new RegionDestroyedException("Region Destroyed", "mocked region"))
+ .when(diskRegion).acquireReadLock();
+ StoredObject storedObject = mock(StoredObject.class);
+ try {
+ DiskEntry.Helper.update(diskEntry, lr, storedObject, entryEvent);
+ fail();
+ } catch (RegionDestroyedException rde) {
+ verify(storedObject, times(1)).release();
+ }
+ }
+
+
+ @Test
+ public void whenBasicUpdateWithDiskRegionBackupAndEntryNotSetThenReleaseOnStoredObjectShouldBeCalled()
+ throws Exception {
+ StoredObject storedObject = mock(StoredObject.class);
+ LocalRegion lr = mock(LocalRegion.class);
+ DiskEntry diskEntry = mock(DiskEntry.class);
+ when(diskEntry.getDiskId()).thenReturn(mock(DiskId.class));
+ EntryEventImpl entryEvent = mock(EntryEventImpl.class);
+ DiskRegion diskRegion = mock(DiskRegion.class);
+ when(diskRegion.isBackup()).thenReturn(true);
+ doThrow(new RegionDestroyedException("", "")).when(diskRegion).put(eq(diskEntry), eq(lr),
+ ArgumentMatchers.any(DiskEntry.Helper.ValueWrapper.class), anyBoolean());
+ when(lr.getDiskRegion()).thenReturn(diskRegion);
+ try {
+ DiskEntry.Helper.basicUpdateForTesting(diskEntry, lr, storedObject, entryEvent);
+ fail();
+ } catch (RegionDestroyedException rde) {
+ verify(storedObject, times(1)).release();
+ }
+ }
+
+ @Test
+ public void whenBasicUpdateWithDiskRegionBackupAndAsyncWritesAndEntryNotSetThenReleaseOnStoredObjectShouldBeCalled()
+ throws Exception {
+ StoredObject storedObject = mock(StoredObject.class);
+ LocalRegion lr = mock(LocalRegion.class);
+ DiskEntry diskEntry = mock(DiskEntry.class);
+ when(diskEntry.getDiskId()).thenReturn(mock(DiskId.class));
+ EntryEventImpl entryEvent = mock(EntryEventImpl.class);
+ DiskRegion diskRegion = mock(DiskRegion.class);
+ when(diskRegion.isBackup()).thenReturn(true);
+ doThrow(new RegionDestroyedException("", "")).when(diskRegion).put(eq(diskEntry), eq(lr),
+ ArgumentMatchers.any(DiskEntry.Helper.ValueWrapper.class), anyBoolean());
+ when(lr.getDiskRegion()).thenReturn(diskRegion);
+
+ when(diskRegion.isSync()).thenReturn(false);
+ when(lr.isInitialized()).thenReturn(true);
+ when(lr.getConcurrencyChecksEnabled()).thenThrow(new RegionDestroyedException("", ""));
+ try {
+ DiskEntry.Helper.basicUpdateForTesting(diskEntry, lr, storedObject, entryEvent);
+ fail();
+ } catch (RegionDestroyedException rde) {
+ verify(storedObject, times(1)).release();
+ }
+ }
+
+ @Test
+ public void whenBasicUpdateButNotBackupAndEntrySet() throws Exception {
+ StoredObject storedObject = mock(StoredObject.class);
+ LocalRegion lr = mock(LocalRegion.class);
+ DiskEntry diskEntry = mock(DiskEntry.class);
+ when(diskEntry.getDiskId()).thenReturn(mock(DiskId.class));
+ EntryEventImpl entryEvent = mock(EntryEventImpl.class);
+ DiskRegion diskRegion = mock(DiskRegion.class);
+ when(diskRegion.isBackup()).thenReturn(false);
+ when(lr.getDiskRegion()).thenReturn(diskRegion);
+ DiskEntry.Helper.basicUpdateForTesting(diskEntry, lr, storedObject, entryEvent);
+ verify(storedObject, times(0)).release();
+ }
+
+ @Test
+ public void whenBasicUpdateButNotBackupAndDiskIdIsNullAndEntrySet() throws Exception {
+ StoredObject storedObject = mock(StoredObject.class);
+ LocalRegion lr = mock(LocalRegion.class);
+ DiskEntry diskEntry = mock(DiskEntry.class);
+ EntryEventImpl entryEvent = mock(EntryEventImpl.class);
+ DiskRegion diskRegion = mock(DiskRegion.class);
+ when(diskRegion.isBackup()).thenReturn(false);
+ when(lr.getDiskRegion()).thenReturn(diskRegion);
+ DiskEntry.Helper.basicUpdateForTesting(diskEntry, lr, storedObject, entryEvent);
+ verify(storedObject, times(0)).release();
+ }
}