You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by je...@apache.org on 2020/04/03 20:58:11 UTC

[geode] branch develop updated: GEODE-7946: Fix redis publish/subscribe leaking netty buffers (#4902)

This is an automated email from the ASF dual-hosted git repository.

jensdeppe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 3c99931  GEODE-7946: Fix redis publish/subscribe leaking netty buffers (#4902)
3c99931 is described below

commit 3c99931995e2afe8f2800cc12a26b95b49745941
Author: Jens Deppe <jd...@pivotal.io>
AuthorDate: Fri Apr 3 13:57:35 2020 -0700

    GEODE-7946: Fix redis publish/subscribe leaking netty buffers (#4902)
    
    Authored-by: Jens Deppe <jd...@vmware.com>
---
 .../org/apache/geode/redis/internal/AbstractSubscription.java  | 10 +++++++++-
 .../redis/internal/executor/pubsub/PsubscribeExecutor.java     |  5 ++++-
 .../redis/internal/executor/pubsub/SubscribeExecutor.java      |  5 ++++-
 3 files changed, 17 insertions(+), 3 deletions(-)

diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/AbstractSubscription.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/AbstractSubscription.java
index c5d7a7a..1bc3e6f 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/AbstractSubscription.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/AbstractSubscription.java
@@ -16,6 +16,7 @@
 
 package org.apache.geode.redis.internal;
 
+import java.nio.channels.ClosedChannelException;
 import java.util.concurrent.ExecutionException;
 
 import io.netty.buffer.ByteBuf;
@@ -81,7 +82,14 @@ public abstract class AbstractSubscription implements Subscription {
 
     try {
       channelFuture.get();
-    } catch (InterruptedException | ExecutionException e) {
+    } catch (ExecutionException e) {
+      if (e.getCause() instanceof ClosedChannelException) {
+        logger.warn("Unable to write to channel: {}", e.getMessage());
+      } else {
+        logger.warn("Unable to write to channel", e);
+      }
+      return false;
+    } catch (InterruptedException e) {
       logger.warn("Unable to write to channel", e);
       return false;
     }
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PsubscribeExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PsubscribeExecutor.java
index 5a91146..8e6cfba 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PsubscribeExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PsubscribeExecutor.java
@@ -62,7 +62,10 @@ public class PsubscribeExecutor extends AbstractExecutor {
       } catch (CoderException e) {
         logger.warn("Error encoding subscribe response", e);
       }
-      aggregatedResponse.writeBytes(response);
+      if (response != null) {
+        aggregatedResponse.writeBytes(response);
+        response.release();
+      }
     });
     command.setResponse(aggregatedResponse);
   }
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/SubscribeExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/SubscribeExecutor.java
index 769babc..c98f91f 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/SubscribeExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/SubscribeExecutor.java
@@ -59,7 +59,10 @@ public class SubscribeExecutor extends AbstractExecutor {
       } catch (CoderException e) {
         logger.warn("Error encoding subscribe response", e);
       }
-      aggregatedResponse.writeBytes(response);
+      if (response != null) {
+        aggregatedResponse.writeBytes(response);
+        response.release();
+      }
     });
     command.setResponse(aggregatedResponse);
   }