You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/04/14 22:30:17 UTC

[07/18] incubator-geode git commit: GEODE-1199: fix off-heap leak in netWrite

GEODE-1199: fix off-heap leak in netWrite


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/80533ba8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/80533ba8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/80533ba8

Branch: refs/heads/feature/GEODE-1162
Commit: 80533ba8503e8fa04aed3291b2d4efbc29e61535
Parents: fbee35c
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Thu Apr 7 16:20:18 2016 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Wed Apr 13 10:16:20 2016 -0700

----------------------------------------------------------------------
 .../cache/SearchLoadAndWriteProcessor.java      | 28 ++++++--
 .../cache/SearchLoadAndWriteProcessorTest.java  | 68 ++++++++++++++++++++
 2 files changed, 90 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/80533ba8/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessor.java
index 8224fc2..d9729a7 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessor.java
@@ -74,6 +74,8 @@ import com.gemstone.gemfire.internal.cache.versions.VersionTag;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
 import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
+import com.gemstone.gemfire.internal.offheap.annotations.Retained;
 
 
 /**
@@ -211,7 +213,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
 
     int action = paction;
     this.requestInProgress = true;
-    Scope scope = this.region.scope;
+    Scope scope = this.region.getScope();
     if (localWriter != null) {
       doLocalWrite(localWriter, event, action);
       this.requestInProgress = false;
@@ -220,14 +222,22 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
     if (scope == Scope.LOCAL && (region.getPartitionAttributes() == null)) {
       return false;
     }
+    @Released
     CacheEvent listenerEvent = getEventForListener(event);
+    try {
     if (action == BEFOREUPDATE && listenerEvent.getOperation().isCreate()) {
       action = BEFORECREATE;
     }
-    boolean cacheWrote = netWrite(getEventForListener(event), action, netWriteRecipients);
+    boolean cacheWrote = netWrite(listenerEvent, action, netWriteRecipients);
     this.requestInProgress = false;
     return cacheWrote;
-
+    } finally {
+      if (event != listenerEvent) {
+        if (listenerEvent instanceof EntryEventImpl) {
+          ((EntryEventImpl) listenerEvent).release();
+        }
+      }
+    }
   }
 
   public void memberJoined(InternalDistributedMember id) {
@@ -810,16 +820,20 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
 
   /**
    * Returns an event for listener notification.  The event's operation
-   * may be altered to conform to the ConcurrentMap implementation specification
+   * may be altered to conform to the ConcurrentMap implementation specification.
+   * If the returned value is not == to the event parameter then the caller
+   * is responsible for releasing it.
    * @param event the original event
    * @return the original event or a new event having a change in operation
    */
+  @Retained
   private CacheEvent getEventForListener(CacheEvent event) {
     Operation op = event.getOperation();
     if (!op.isEntry()) {
       return event;
     } else {
       EntryEventImpl r = (EntryEventImpl)event;
+      @Retained
       EntryEventImpl result = r;
       if (r.isSingleHop()) {
         // fix for bug #46130 - origin remote incorrect for one-hop operation in receiver
@@ -856,6 +870,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
         return false;
       }
     }
+    @Released
     CacheEvent event = getEventForListener(pevent);
     
     int action = paction;
@@ -898,8 +913,9 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
   /** Return true if cache writer was invoked */
   private boolean netWrite(CacheEvent event, int action, Set writeCandidateSet)
   throws CacheWriterException, TimeoutException {
-
-    // assert !writeCandidateSet.isEmpty();
+    if (writeCandidateSet == null || writeCandidateSet.isEmpty()) {
+      return false;
+    }
     ArrayList list = new ArrayList(writeCandidateSet);
     Collections.shuffle(list);
     InternalDistributedMember[] writeCandidates = (InternalDistributedMember[])list.

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/80533ba8/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessorTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessorTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessorTest.java
new file mode 100644
index 0000000..57aca37
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessorTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.internal.offheap.StoredObject;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class SearchLoadAndWriteProcessorTest {
+
+  /**
+   * This test verifies the fix for GEODE-1199.
+   * It verifies that when doNetWrite is called with an event
+   * that has a StoredObject value that it will have "release"
+   * called on it.
+   */
+  @Test
+  public void verifyThatOffHeapReleaseIsCalledAfterNetWrite() {
+    // setup
+    SearchLoadAndWriteProcessor processor = SearchLoadAndWriteProcessor.getProcessor();
+    LocalRegion lr = mock(LocalRegion.class);
+    when(lr.getScope()).thenReturn(Scope.DISTRIBUTED_ACK);
+    Object key = "key";
+    StoredObject value = mock(StoredObject.class);
+    when(value.hasRefCount()).thenReturn(true);
+    when(value.retain()).thenReturn(true);
+    Object cbArg = null;
+    KeyInfo keyInfo = new KeyInfo(key, value, cbArg);
+    when(lr.getKeyInfo(any(), any(), any())).thenReturn(keyInfo);
+    processor.region = lr;
+    EntryEventImpl event = EntryEventImpl.create(lr, Operation.REPLACE, key, value, cbArg, false, null);
+    
+    try {
+      // the test
+      processor.doNetWrite(event, null, null, 0);
+      
+      // verification
+      verify(value, times(2)).retain();
+      verify(value, times(1)).release();
+      
+    } finally {
+      processor.release();
+    }
+  }
+
+}