You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2016/04/13 19:17:31 UTC
incubator-geode git commit: GEODE-1199: fix off-heap leak in netWrite
Repository: incubator-geode
Updated Branches:
refs/heads/develop fbee35cc4 -> 80533ba85
GEODE-1199: fix off-heap leak in netWrite
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/80533ba8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/80533ba8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/80533ba8
Branch: refs/heads/develop
Commit: 80533ba8503e8fa04aed3291b2d4efbc29e61535
Parents: fbee35c
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Thu Apr 7 16:20:18 2016 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Wed Apr 13 10:16:20 2016 -0700
----------------------------------------------------------------------
.../cache/SearchLoadAndWriteProcessor.java | 28 ++++++--
.../cache/SearchLoadAndWriteProcessorTest.java | 68 ++++++++++++++++++++
2 files changed, 90 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/80533ba8/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessor.java
index 8224fc2..d9729a7 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessor.java
@@ -74,6 +74,8 @@ import com.gemstone.gemfire.internal.cache.versions.VersionTag;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
+import com.gemstone.gemfire.internal.offheap.annotations.Retained;
/**
@@ -211,7 +213,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
int action = paction;
this.requestInProgress = true;
- Scope scope = this.region.scope;
+ Scope scope = this.region.getScope();
if (localWriter != null) {
doLocalWrite(localWriter, event, action);
this.requestInProgress = false;
@@ -220,14 +222,22 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
if (scope == Scope.LOCAL && (region.getPartitionAttributes() == null)) {
return false;
}
+ @Released
CacheEvent listenerEvent = getEventForListener(event);
+ try {
if (action == BEFOREUPDATE && listenerEvent.getOperation().isCreate()) {
action = BEFORECREATE;
}
- boolean cacheWrote = netWrite(getEventForListener(event), action, netWriteRecipients);
+ boolean cacheWrote = netWrite(listenerEvent, action, netWriteRecipients);
this.requestInProgress = false;
return cacheWrote;
-
+ } finally {
+ if (event != listenerEvent) {
+ if (listenerEvent instanceof EntryEventImpl) {
+ ((EntryEventImpl) listenerEvent).release();
+ }
+ }
+ }
}
public void memberJoined(InternalDistributedMember id) {
@@ -810,16 +820,20 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
/**
* Returns an event for listener notification. The event's operation
- * may be altered to conform to the ConcurrentMap implementation specification
+ * may be altered to conform to the ConcurrentMap implementation specification.
+ * If the returned value is not == to the event parameter then the caller
+ * is responsible for releasing it.
* @param event the original event
* @return the original event or a new event having a change in operation
*/
+ @Retained
private CacheEvent getEventForListener(CacheEvent event) {
Operation op = event.getOperation();
if (!op.isEntry()) {
return event;
} else {
EntryEventImpl r = (EntryEventImpl)event;
+ @Retained
EntryEventImpl result = r;
if (r.isSingleHop()) {
// fix for bug #46130 - origin remote incorrect for one-hop operation in receiver
@@ -856,6 +870,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
return false;
}
}
+ @Released
CacheEvent event = getEventForListener(pevent);
int action = paction;
@@ -898,8 +913,9 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
/** Return true if cache writer was invoked */
private boolean netWrite(CacheEvent event, int action, Set writeCandidateSet)
throws CacheWriterException, TimeoutException {
-
- // assert !writeCandidateSet.isEmpty();
+ if (writeCandidateSet == null || writeCandidateSet.isEmpty()) {
+ return false;
+ }
ArrayList list = new ArrayList(writeCandidateSet);
Collections.shuffle(list);
InternalDistributedMember[] writeCandidates = (InternalDistributedMember[])list.
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/80533ba8/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessorTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessorTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessorTest.java
new file mode 100644
index 0000000..57aca37
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessorTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.internal.offheap.StoredObject;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class SearchLoadAndWriteProcessorTest {
+
+ /**
+ * This test verifies the fix for GEODE-1199.
+ * It verifies that when doNetWrite is called with an event
+ * that has a StoredObject value that it will have "release"
+ * called on it.
+ */
+ @Test
+ public void verifyThatOffHeapReleaseIsCalledAfterNetWrite() {
+ // setup
+ SearchLoadAndWriteProcessor processor = SearchLoadAndWriteProcessor.getProcessor();
+ LocalRegion lr = mock(LocalRegion.class);
+ when(lr.getScope()).thenReturn(Scope.DISTRIBUTED_ACK);
+ Object key = "key";
+ StoredObject value = mock(StoredObject.class);
+ when(value.hasRefCount()).thenReturn(true);
+ when(value.retain()).thenReturn(true);
+ Object cbArg = null;
+ KeyInfo keyInfo = new KeyInfo(key, value, cbArg);
+ when(lr.getKeyInfo(any(), any(), any())).thenReturn(keyInfo);
+ processor.region = lr;
+ EntryEventImpl event = EntryEventImpl.create(lr, Operation.REPLACE, key, value, cbArg, false, null);
+
+ try {
+ // the test
+ processor.doNetWrite(event, null, null, 0);
+
+ // verification
+ verify(value, times(2)).retain();
+ verify(value, times(1)).release();
+
+ } finally {
+ processor.release();
+ }
+ }
+
+}