You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2021/03/16 18:55:04 UTC
[geode] 31/36: Add RedisDecoder to the netty pipeline
This is an automated email from the ASF dual-hosted git repository.
upthewaterspout pushed a commit to branch feature/redis-performance-testing
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 420b28ce48cc8184703a11da86a444a7819fc10f
Author: Jens Deppe <jd...@vmware.com>
AuthorDate: Mon Mar 15 15:03:52 2021 -0700
Add RedisDecoder to the netty pipeline
---
.../internal/netty/MessageToCommandDecoder.java | 61 ++++++++++++++++++++++
.../redis/internal/netty/NettyRedisServer.java | 9 +++-
2 files changed, 68 insertions(+), 2 deletions(-)
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/MessageToCommandDecoder.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/MessageToCommandDecoder.java
new file mode 100644
index 0000000..3f0685c
--- /dev/null
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/MessageToCommandDecoder.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.netty;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import io.netty.handler.codec.redis.ArrayHeaderRedisMessage;
+import io.netty.handler.codec.redis.FullBulkStringRedisMessage;
+import io.netty.handler.codec.redis.RedisMessage;
+
+public class MessageToCommandDecoder extends MessageToMessageDecoder<RedisMessage> {
+
+ private static final ThreadLocal<LocalRedisArray> currentCommand =
+ ThreadLocal.withInitial(LocalRedisArray::new);
+
+ @Override
+ protected void decode(ChannelHandlerContext ctx, RedisMessage msg, List<Object> out)
+ throws Exception {
+ LocalRedisArray array = currentCommand.get();
+
+ if (msg instanceof ArrayHeaderRedisMessage) {
+ array.count = ((ArrayHeaderRedisMessage) msg).length();
+ return;
+ }
+
+ if (msg instanceof FullBulkStringRedisMessage) {
+ ByteBuf buffer = ((FullBulkStringRedisMessage) msg).content();
+ byte[] data = new byte[buffer.readableBytes()];
+ buffer.readBytes(data);
+ array.list.add(data);
+ array.count--;
+
+ if (array.count == 0) {
+ out.add(new Command(array.list));
+ currentCommand.remove();
+ }
+ }
+ }
+
+ private static class LocalRedisArray {
+ List<byte[]> list = new ArrayList<>();
+ long count = 0;
+ }
+}
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/NettyRedisServer.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/NettyRedisServer.java
index 3a1d903..5110696 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/NettyRedisServer.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/NettyRedisServer.java
@@ -43,6 +43,8 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.redis.RedisBulkStringAggregator;
+import io.netty.handler.codec.redis.RedisDecoder;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.timeout.WriteTimeoutHandler;
@@ -169,8 +171,11 @@ public class NettyRedisServer {
}
ChannelPipeline pipeline = socketChannel.pipeline();
addSSLIfEnabled(socketChannel, pipeline);
- pipeline.addLast(ByteToCommandDecoder.class.getSimpleName(),
- new ByteToCommandDecoder(redisStats));
+ // pipeline.addLast(ByteToCommandDecoder.class.getSimpleName(),
+ // new ByteToCommandDecoder(redisStats));
+ pipeline.addLast(new RedisDecoder());
+ pipeline.addLast(new RedisBulkStringAggregator());
+ pipeline.addLast(new MessageToCommandDecoder());
pipeline.addLast(new WriteTimeoutHandler(10));
pipeline.addLast(ExecutionHandlerContext.class.getSimpleName(),
new ExecutionHandlerContext(socketChannel, regionProvider, pubsub,