You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by je...@apache.org on 2020/04/03 20:58:11 UTC
[geode] branch develop updated: GEODE-7946: Fix redis
publish/subscribe leaking netty buffers (#4902)
This is an automated email from the ASF dual-hosted git repository.
jensdeppe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 3c99931 GEODE-7946: Fix redis publish/subscribe leaking netty buffers (#4902)
3c99931 is described below
commit 3c99931995e2afe8f2800cc12a26b95b49745941
Author: Jens Deppe <jd...@pivotal.io>
AuthorDate: Fri Apr 3 13:57:35 2020 -0700
GEODE-7946: Fix redis publish/subscribe leaking netty buffers (#4902)
Authored-by: Jens Deppe <jd...@vmware.com>
---
.../org/apache/geode/redis/internal/AbstractSubscription.java | 10 +++++++++-
.../redis/internal/executor/pubsub/PsubscribeExecutor.java | 5 ++++-
.../redis/internal/executor/pubsub/SubscribeExecutor.java | 5 ++++-
3 files changed, 17 insertions(+), 3 deletions(-)
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/AbstractSubscription.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/AbstractSubscription.java
index c5d7a7a..1bc3e6f 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/AbstractSubscription.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/AbstractSubscription.java
@@ -16,6 +16,7 @@
package org.apache.geode.redis.internal;
+import java.nio.channels.ClosedChannelException;
import java.util.concurrent.ExecutionException;
import io.netty.buffer.ByteBuf;
@@ -81,7 +82,14 @@ public abstract class AbstractSubscription implements Subscription {
try {
channelFuture.get();
- } catch (InterruptedException | ExecutionException e) {
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof ClosedChannelException) {
+ logger.warn("Unable to write to channel: {}", e.getMessage());
+ } else {
+ logger.warn("Unable to write to channel", e);
+ }
+ return false;
+ } catch (InterruptedException e) {
logger.warn("Unable to write to channel", e);
return false;
}
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PsubscribeExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PsubscribeExecutor.java
index 5a91146..8e6cfba 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PsubscribeExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PsubscribeExecutor.java
@@ -62,7 +62,10 @@ public class PsubscribeExecutor extends AbstractExecutor {
} catch (CoderException e) {
logger.warn("Error encoding subscribe response", e);
}
- aggregatedResponse.writeBytes(response);
+ if (response != null) {
+ aggregatedResponse.writeBytes(response);
+ response.release();
+ }
});
command.setResponse(aggregatedResponse);
}
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/SubscribeExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/SubscribeExecutor.java
index 769babc..c98f91f 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/SubscribeExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/SubscribeExecutor.java
@@ -59,7 +59,10 @@ public class SubscribeExecutor extends AbstractExecutor {
} catch (CoderException e) {
logger.warn("Error encoding subscribe response", e);
}
- aggregatedResponse.writeBytes(response);
+ if (response != null) {
+ aggregatedResponse.writeBytes(response);
+ response.release();
+ }
});
command.setResponse(aggregatedResponse);
}