You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/06/07 20:54:52 UTC

[37/62] [abbrv] incubator-geode git commit: GEODE-1468 client/server messaging can create large objects

GEODE-1468 client/server messaging can create large objects

After a Message has been sent we invoke clear() on each Part contained by
the Message.  This was nulling out the "part" variable of the Part objects
but if one of these "parts" was a HeapDataOutputStream it might hold a
list of large buffers.  This change set alters Part to close these
streams so that their buffers can be cleared.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/61ad7e44
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/61ad7e44
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/61ad7e44

Branch: refs/heads/feature/GEODE-837
Commit: 61ad7e4451aa5504c2f5d92b5d41ce1ffbcec239
Parents: 711fc35
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Fri Jun 3 08:42:00 2016 -0700
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Fri Jun 3 08:43:59 2016 -0700

----------------------------------------------------------------------
 .../gemfire/internal/HeapDataOutputStream.java  |  5 ++-
 .../internal/cache/tier/sockets/Message.java    | 33 ++++++++++----------
 .../internal/cache/tier/sockets/Part.java       |  7 ++++-
 .../cache/tier/sockets/MessageJUnitTest.java    | 18 +++++++++++
 4 files changed, 45 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/61ad7e44/geode-core/src/main/java/com/gemstone/gemfire/internal/HeapDataOutputStream.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/HeapDataOutputStream.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/HeapDataOutputStream.java
old mode 100644
new mode 100755
index 4bf39b6..eaad26e
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/HeapDataOutputStream.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/HeapDataOutputStream.java
@@ -366,7 +366,10 @@ public class HeapDataOutputStream extends OutputStream implements
   
   public final void reset() {
     this.size = 0;
-    this.chunks = null;
+    if (this.chunks != null) {
+      this.chunks.clear();
+      this.chunks = null;
+    }
     this.buffer.clear();
     this.writeMode = true;
     this.ignoreWrites = false;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/61ad7e44/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java
index 139ccde..459cf5f 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java
@@ -513,18 +513,21 @@ public class Message  {
       // Keep track of the fact that we are making progress.
       this.sc.updateProcessingMessage();
     }
-    if (this.socket != null) {
+    if (this.socket == null) {
+      throw new IOException(LocalizedStrings.Message_DEAD_CONNECTION.toLocalizedString());
+    }
+    try {
       final ByteBuffer cb = getCommBuffer();
       if (cb == null) {
         throw new IOException("No buffer");
       }
       int msgLen = 0;
-      synchronized(cb) {
+      synchronized (cb) {
         long totalPartLen = 0;
         long headerLen = 0;
         int partsToTransmit = this.numberOfParts;
-        
-        for (int i=0; i < this.numberOfParts; i++) {
+
+        for (int i = 0; i < this.numberOfParts; i++) {
           Part part = this.partsList[i];
           headerLen += PART_HEADER_SIZE;
           totalPartLen += part.getLength();
@@ -540,27 +543,27 @@ public class Message  {
           partsToTransmit++;
         }
 
-        if ( (headerLen + totalPartLen) > Integer.MAX_VALUE ) {
-          throw new MessageTooLargeException("Message size (" + (headerLen + totalPartLen) 
+        if ((headerLen + totalPartLen) > Integer.MAX_VALUE) {
+          throw new MessageTooLargeException("Message size (" + (headerLen + totalPartLen)
               + ") exceeds maximum integer value");
         }
-        
-        msgLen = (int)(headerLen + totalPartLen);
-        
+
+        msgLen = (int) (headerLen + totalPartLen);
+
         if (msgLen > MAX_MESSAGE_SIZE) {
           throw new MessageTooLargeException("Message size (" + msgLen
               + ") exceeds gemfire.client.max-message-size setting (" + MAX_MESSAGE_SIZE + ")");
         }
-        
+
         cb.clear();
         packHeaderInfoForSending(msgLen, (securityPart != null));
-        for (int i=0; i < partsToTransmit; i++) {
+        for (int i = 0; i < partsToTransmit; i++) {
           Part part = (i == this.numberOfParts) ? securityPart : partsList[i];
 
           if (cb.remaining() < PART_HEADER_SIZE) {
             flushBuffer();
           }
-          
+
           int partLen = part.getLength();
           cb.putInt(partLen);
           cb.put(part.getTypeCode());
@@ -586,13 +589,11 @@ public class Message  {
           this.os.flush();
         }
       }
-      if(clearMessage) {
+    } finally {
+      if (clearMessage) {
         clearParts();
       }
     }
-    else {
-      throw new IOException(LocalizedStrings.Message_DEAD_CONNECTION.toLocalizedString());
-    }
   }
 
   protected void flushBuffer() throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/61ad7e44/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Part.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Part.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Part.java
index 5e52f96..1c3819e 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Part.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Part.java
@@ -67,7 +67,12 @@ public class Part {
 
 
   public void clear() {
-    this.part = null;
+    if (this.part != null) {
+      if (this.part instanceof HeapDataOutputStream) {
+        ((HeapDataOutputStream)this.part).close();
+      }
+      this.part = null;
+    }
     this.typeCode = BYTE_CODE;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/61ad7e44/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/MessageJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/MessageJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/MessageJUnitTest.java
index 9f05aa7..24f665f 100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/MessageJUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/MessageJUnitTest.java
@@ -102,6 +102,24 @@ public class MessageJUnitTest {
     }
   }
 
+  /**
+   * geode-1468: Message should clear the chunks in its Parts when
+   * performing cleanup.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void streamBuffersAreClearedDuringCleanup() throws Exception {
+    Part[] parts = new Part[2];
+    Part mockPart1 = mock(Part.class);
+    when(mockPart1.getLength()).thenReturn(100);
+    parts[0] = mockPart1;
+    parts[1] = mockPart1;
+    message.setParts(parts);
+    message.clearParts();
+    verify(mockPart1, times(2)).clear();
+  }
+
   // TODO many more tests are needed
 
 }